You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@quickstep.apache.org by jianqiao <gi...@git.apache.org> on 2017/02/05 05:03:11 UTC

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

GitHub user jianqiao opened a pull request:

    https://github.com/apache/incubator-quickstep/pull/179

    QUICKSTEP-70-71 Improve aggregation performance

    This PR implements two features that improve aggregation performance:
    1. Adds `CollisionFreeVectorTable` to support specialized high-performance aggregation.
    2. Adds support for aggregation copy elision that we only materialize intermediate results for non-trivial expressions.
    
    For feature 1, when the group-by attribute is a range-bounded single attribute of `INT` or `LONG` type. We can use a vector of type `std::vector<std::atomic<StateT>>` to store the aggregation states, where `StateT` is the aggregation state type (currently restricted to `LONG` and `DOUBLE`). Then during aggregation, for each tuple, we locate the aggregation state with the group-by key's value as index to the state vector, and concurrently update the state with C++'s atomic primitives.
    
    For feature 2, note that the current implementation of aggregation always creates a `ColumnVectorsValueAccessor` to store the results of ALL the input expressions. However, we can avoid the creation of a column vector (thus avoiding copying values into the column vector) if the aggregation is on a simple attribute, e.g. `SUM(x)`. Thus by PR, when performing aggregation we prepare two input `ValueAccessor`s: one BASE accessor that is created directly from the input relation's storage block, and one DERIVED accessor that is the temporary result `ColumnVectorsValueAccessor`. Each aggregation argument may be from the base accessor (meaning that it is a simple attribute) or from the derived accessor (meaning that it is a non-trivial expression that gets evaluated). The two accessors are then properly handled in aggregation handles and aggregation hash tables.
    
    **Main changes:**
    `expressions/aggregation`: Updated the aggregation handles to support copy elision. Also did some cleanups.
    `relational_operators`: Added `InitializeAggregationOperator` to support parallel initialization of the aggregation state (just `memset` the memory to 0) -- because it takes a relatively long time to do the initialization with single thread if the aggregation hash table is large.
    `storage`: Added `CollisionFreeVectorTable`. Renamed `FastHashTable` to `PackedPayloadHashTable`, made it support copy elision, and cleaned up the class to remove unused methods. Refactored `AggregationOperationState` to support copy elision and support the new aggregation. Moved aggregation code out of `StorageBlock`.
    
    This PR significantly improves some TPC-H queries' performance. For example, it improves TPC-H Q18 from ~27.5s to ~3.5s, with scale factor 100 on a cloudlab machine.
    
    Below shows the TPC-H performance (scale factor 100 on a cloudlab machine) with recently committed optimizations up to this point:
    
    |  **TPCH SF100** | **master (ms)** | **w/ optimizations (ms)** |
    |  ------ | ------ | ------ |
    |  Q01 | 13629 | 11221 |
    |  Q02 | 537 | 460 |
    |  Q03 | 4824 | 4124 |
    |  Q04 | 2185 | 2203 |
    |  Q05 | 5517 | 5282 |
    |  Q06 | 399 | 401 |
    |  Q07 | 18563 | 3456 |
    |  Q08 | 1746 | 899 |
    |  Q09 | 7247 | 5586 |
    |  Q10 | 6745 | 5665 |
    |  Q11 | 1053 | 247 |
    |  Q12 | 1713 | 1698 |
    |  Q13 | 22896 | 15582 |
    |  Q14 | 805 | 745 |
    |  Q15 | 897 | 431 |
    |  Q16 | 9942 | 9158 |
    |  Q17 | 1588 | 1117 |
    |  Q18 | 27459 | 3507 |
    |  Q19 | 1711 | 1609 |
    |  Q20 | 1204 | 1014 |
    |  Q21 | 8671 | 7886 |
    |  Q22 | 6178 | 724 |
    |  **Total** | **145509** | **83016** |

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/incubator-quickstep collision-free-agg

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-quickstep/pull/179.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #179
    
----
commit 68be4a614f55412632f13051295327fecba1fada
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Date:   2017-01-30T20:46:39Z

    - Adds CollisionFreeVectorTable to support specialized fast path aggregation for range-bounded single integer group-by key.
    - Supports copy elision for aggregation.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99517316
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    +    FATAL_ERROR(
    +        "Storage requirement for SeparateChainingHashTable "
    +        "exceeds maximum allocation size.");
    +  }
    +
    +  // Get a StorageBlob to hold the hash table.
    +  const block_id blob_id =
    +      this->storage_manager_->createBlob(num_storage_slots);
    +  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
    +
    +  void *aligned_memory_start = this->blob_->getMemoryMutable();
    +  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
    +  if (align(alignof(Header),
    +            sizeof(Header),
    +            aligned_memory_start,
    +            available_memory) == nullptr) {
    +    // With current values from StorageConstants.hpp, this should be
    +    // impossible. A blob is at least 1 MB, while a Header has alignment
    +    // requirement of just kCacheLineBytes (64 bytes).
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet alignment "
    +        "requirements of SeparateChainingHashTable::Header.");
    +  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
    +    // This should also be impossible, since the StorageManager allocates slots
    +    // aligned to kCacheLineBytes.
    +    DEV_WARNING("StorageBlob memory adjusted by "
    +                << (num_storage_slots * kSlotSizeBytes - available_memory)
    +                << " bytes to meet alignment requirement for "
    +                << "SeparateChainingHashTable::Header.");
    +  }
    +
    +  // Locate the header.
    +  header_ = static_cast<Header *>(aligned_memory_start);
    +  aligned_memory_start =
    +      static_cast<char *>(aligned_memory_start) + sizeof(Header);
    +  available_memory -= sizeof(Header);
    +
    +  // Recompute the number of slots & buckets using the actual available memory.
    +  // Most likely, we got some extra free bucket space due to "rounding up" to
    +  // the storage blob's size. It's also possible (though very unlikely) that we
    +  // will wind up with fewer buckets than we initially wanted because of screwy
    +  // alignment requirements for ValueT.
    +  std::size_t num_buckets_tmp =
    +      available_memory /
    +      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
    +       key_manager_.getEstimatedVariableKeySize());
    +  num_slots_tmp =
    +      get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
    +  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
    +  DEBUG_ASSERT(num_slots_tmp > 0);
    +  DEBUG_ASSERT(num_buckets_tmp > 0);
    +
    +  // Locate the slot array.
    +  slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
    +  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
    +                         sizeof(std::atomic<std::size_t>) * num_slots_tmp;
    +  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
    +
    +  // Locate the buckets.
    +  buckets_ = aligned_memory_start;
    +  // Extra-paranoid: If ValueT has an alignment requirement greater than that
    +  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
    +  // array.
    +  if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
    +      nullptr) {
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet "
    +        "alignment requirements of buckets.");
    +  } else if (buckets_ != aligned_memory_start) {
    +    DEV_WARNING(
    +        "Bucket array start position adjusted to meet alignment "
    +        "requirement for SeparateChainingHashTable's value type.");
    +    if (num_buckets_tmp * bucket_size_ > available_memory) {
    +      --num_buckets_tmp;
    +    }
    +  }
    +
    +  // Fill in the header.
    +  header_->num_slots = num_slots_tmp;
    +  header_->num_buckets = num_buckets_tmp;
    +  header_->buckets_allocated.store(0, std::memory_order_relaxed);
    +  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
    +  available_memory -= bucket_size_ * (header_->num_buckets);
    +
    +  // Locate variable-length key storage region, and give it all the remaining
    +  // bytes in the blob.
    +  key_manager_.setVariableLengthStorageInfo(
    +      static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
    +      available_memory,
    +      &(header_->variable_length_bytes_allocated));
    +}
    +
    +PackedPayloadHashTable::~PackedPayloadHashTable() {
    +  if (blob_.valid()) {
    +    const block_id blob_id = blob_->getID();
    +    blob_.release();
    +    storage_manager_->deleteBlockOrBlobFile(blob_id);
    +  }
    +  std::free(init_payload_);
    +}
    +
    +void PackedPayloadHashTable::clear() {
    +  const std::size_t used_buckets =
    +      header_->buckets_allocated.load(std::memory_order_relaxed);
    +  // Destroy existing values, if necessary.
    +  destroyPayload();
    +
    +  // Zero-out slot array.
    +  std::memset(
    +      slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
    +
    +  // Zero-out used buckets.
    +  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
    +
    +  header_->buckets_allocated.store(0, std::memory_order_relaxed);
    +  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
    +  key_manager_.zeroNextVariableLengthKeyOffset();
    +}
    +
    +void PackedPayloadHashTable::destroyPayload() {
    +  const std::size_t num_buckets =
    +      header_->buckets_allocated.load(std::memory_order_relaxed);
    +  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
    +  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
    +    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
    +      void *value_internal_ptr =
    +          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
    +      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
    +    }
    +    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
    +  }
    +}
    +
    +bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
    +    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
    +    const std::vector<MultiSourceAttributeId> &key_attr_ids,
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +  ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
    +
    +  base_accessor->beginIterationVirtual();
    +  if (derived_accessor == nullptr) {
    +    return upsertValueAccessorCompositeKeyInternal<false>(
    +        argument_ids,
    +        key_attr_ids,
    +        base_accessor,
    +        nullptr);
    +  } else {
    +    DCHECK(derived_accessor->getImplementationType()
    +               == ValueAccessor::Implementation::kColumnVectors);
    +    derived_accessor->beginIterationVirtual();
    +    return upsertValueAccessorCompositeKeyInternal<true>(
    +        argument_ids,
    +        key_attr_ids,
    +        base_accessor,
    +        static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
    +  }
    +}
    +
    +void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
    +                                    const std::size_t extra_variable_storage,
    +                                    const std::size_t retry_num) {
    +  // A retry should never be necessary with this implementation of HashTable.
    +  // Separate chaining ensures that any resized hash table with more buckets
    +  // than the original table will be able to hold more entries than the
    +  // original.
    +  DEBUG_ASSERT(retry_num == 0);
    +
    +  SpinSharedMutexExclusiveLock<true> write_lock(this->resize_shared_mutex_);
    +
    +  // Recheck whether the hash table is still full. Note that multiple threads
    +  // might wait to rebuild this hash table simultaneously. Only the first one
    +  // should do the rebuild.
    +  if (!isFull(extra_variable_storage)) {
    +    return;
    +  }
    +
    +  // Approximately double the number of buckets and slots.
    +  //
    +  // TODO(chasseur): It may be worth it to more than double the number of
    +  // buckets here so that we can maintain a good, sparse fill factor for a
    +  // longer time as more values are inserted. Such behavior should take into
    +  // account kHashTableLoadFactor.
    +  std::size_t resized_num_slots = get_next_prime_number(
    +      (header_->num_buckets + extra_buckets / 2) * kHashTableLoadFactor * 2);
    +  std::size_t variable_storage_required =
    +      (resized_num_slots / kHashTableLoadFactor) *
    +      key_manager_.getEstimatedVariableKeySize();
    +  const std::size_t original_variable_storage_used =
    +      header_->variable_length_bytes_allocated.load(std::memory_order_relaxed);
    +  // If this resize was triggered by a too-large variable-length key, bump up
    +  // the variable-length storage requirement.
    +  if ((extra_variable_storage > 0) &&
    +      (extra_variable_storage + original_variable_storage_used >
    +       key_manager_.getVariableLengthKeyStorageSize())) {
    +    variable_storage_required += extra_variable_storage;
    +  }
    +
    +  const std::size_t resized_memory_required =
    +      sizeof(Header) + resized_num_slots * sizeof(std::atomic<std::size_t>) +
    +      (resized_num_slots / kHashTableLoadFactor) * bucket_size_ +
    +      variable_storage_required;
    +  const std::size_t resized_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(resized_memory_required);
    +  if (resized_storage_slots == 0) {
    --- End diff --
    
    Refactor using `CHECK`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505538
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    --- End diff --
    
    We have `partition_id` type defined in `CatalogTypedef.h`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99511761
  
    --- Diff: storage/AggregationOperationState.hpp ---
    @@ -156,6 +152,29 @@ class AggregationOperationState {
           const CatalogDatabaseLite &database);
     
       /**
    +   * @brief Get the number of partitions to be used for initializing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for initializing the aggregation.
    +   **/
    +  std::size_t getNumInitializationPartitions() const;
    +
    +  /**
    +   * @brief Get the number of partitions to be used for finalizing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for finalizing the aggregation.
    +   **/
    +  std::size_t getNumFinalizationPartitions() const;
    +
    +  /**
    +   * @brief Initialize the specified partition of this aggregation.
    +   *
    +   * @param partition_id ID of the partition to be initialized.
    +   */
    +  void initialize(const std::size_t partition_id);
    --- End diff --
    
    Use `partition_id` type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99517587
  
    --- Diff: storage/PackedPayloadHashTable.hpp ---
    @@ -0,0 +1,995 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
    +#define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
    +
    +#include <atomic>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstring>
    +#include <limits>
    +#include <vector>
    +
    +#include "catalog/CatalogTypedefs.hpp"
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableBase.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "storage/ValueAccessorUtil.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/TypedValue.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/HashPair.hpp"
    +#include "utility/Macros.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +class StorageManager;
    +class Type;
    +class ValueAccessor;
    +
    +/** \addtogroup Storage
    + *  @{
    + */
    +
    +/**
    + * @brief Aggregation hash table implementation in which the payload can be just
    + *        a bunch of bytes. This implementation is suitable for aggregation with
    + *        multiple aggregation handles (e.g. SUM, MAX, MIN etc).
    + *
    + * At present the hash table uses separate chaining to resolve collisions, i.e.
    + * Keys/values are stored in a separate region of memory from the base hash
    + * table slot array. Every bucket has a "next" pointer so that entries that
    + * collide (i.e. map to the same base slot) form chains of pointers with each
    + * other.
    + **/
    +class PackedPayloadHashTable : public AggregationStateHashTableBase {
    + public:
    +  /**
    +   * @brief Constructor.
    +   *
    +   * @param key_types A vector of one or more types (>1 indicates a composite
    +   *        key).
    +   * @param num_entries The estimated number of entries this hash table will
    +   *        hold.
    +   * @param handles The aggregation handles.
    +   * @param storage_manager The StorageManager to use (a StorageBlob will be
    +   *        allocated to hold this hash table's contents).
    +   **/
    +  PackedPayloadHashTable(
    +      const std::vector<const Type *> &key_types,
    +      const std::size_t num_entries,
    +      const std::vector<AggregationHandle *> &handles,
    +      StorageManager *storage_manager);
    +
    +  ~PackedPayloadHashTable() override;
    +
    +  /**
    +   * @brief Erase all entries in this hash table.
    +   *
    +   * @warning This method is not guaranteed to be threadsafe.
    +   **/
    +  void clear();
    +
    +  void destroyPayload() override;
    +
    +  /**
    +   * @brief Use aggregation handles to update (multiple) aggregation states in
    +   *        this hash table, with group-by keys and arguments drawn from the
    +   *        given ValueAccessors. New states are first inserted if not already
    +   *        present.
    +   *
    +   * @note This method is threadsafe with regard to other calls to
    +   *       upsertCompositeKey() and upsertValueAccessorCompositeKey().
    +   *
    +   * @param argument_ids The multi-source attribute IDs of each argument
    +   *        component to be read from \p accessor_mux.
    +   * @param key_ids The multi-source attribute IDs of each group-by key
    +   *        component to be read from \p accessor_mux.
    +   * @param accessor_mux A ValueAccessorMultiplexer object that contains the
    +   *        ValueAccessors which will be used to access keys. beginIteration()
    +   *        should be called on the accessors before calling this method.
    +   * @return True on success, false if upsert failed because there was not
    +   *         enough space to insert new entries for all the keys in accessor
    +   *         (note that some entries may still have been upserted, and
    +   *         accessors' iterations will be left on the first tuple which could
    +   *         not be inserted).
    +   **/
    +  bool upsertValueAccessorCompositeKey(
    +      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
    +      const std::vector<MultiSourceAttributeId> &key_ids,
    +      const ValueAccessorMultiplexer &accessor_mux) override;
    +
    +  /**
    +   * @return The ID of the StorageBlob used to store this hash table.
    +   **/
    +  inline block_id getBlobId() const {
    +    return blob_->getID();
    +  }
    +
    +  /**
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call).
    +   *          Concurrent calls to getSingleCompositeKey(), forEach(), and
    +   *          forEachCompositeKey() are safe.
    +   *
    +   * @return The number of entries in this HashTable.
    +   **/
    +  inline std::size_t numEntries() const {
    +    return header_->buckets_allocated.load(std::memory_order_relaxed);
    +  }
    +
    +  /**
    +   * @brief Use aggregation handles to merge the given aggregation states into
    +   *        the aggregation states mapped to the given key. New states are first
    +   *        inserted if not already present.
    +   *
    +   * @warning The key must not be null.
    +   * @note This method is threadsafe with regard to other calls to
    +   *       upsertCompositeKey() and upsertValueAccessorCompositeKey().
    +   *
    +   * @param key The key.
    +   * @param source_state The source aggregation states to be merged into this
    +   *        hash table.
    +   * @return True on success, false if upsert failed because there was not
    +   *         enough space to insert a new entry in this hash table.
    +   **/
    +  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
    +                                 const std::uint8_t *source_state);
    +
    +  /**
    +   * @brief Apply a functor to an aggregation state mapped to the given key.
    +   *        First inserting a new state if one is not already present.
    +   *
    +   * @warning The key must not be null.
    +   * @note This method is threadsafe with regard to other calls to
    +   *       upsertCompositeKey() and upsertValueAccessorCompositeKey().
    +   *
    +   * @param key The key.
    +   * @param functor A pointer to a functor, which should provide a call
    +   *        operator which takes an aggregation state (of type std::uint8_t *)
    +   *        as an argument.
    +   * @param index The index of the target aggregation state among those states
    +   *        mapped to \p key.
    +   * @return True on success, false if upsert failed because there was not
    +   *         enough space to insert a new entry in this hash table.
    +   **/
    +  template <typename FunctorT>
    +  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
    +                                 FunctorT *functor,
    +                                 const std::size_t index);
    +
    +  /**
    +   * @brief Lookup a composite key against this hash table to find a matching
    +   *        entry.
    +   *
    +   * @warning The key must not be null.
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call and as long as the returned pointer may be
    +   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
    +   *          forEach(), and forEachCompositeKey() are safe.
    +   *
    +   * @param key The key to look up.
    +   * @return The value of a matched entry if a matching key is found.
    +   *         Otherwise, return NULL.
    +   **/
    +  inline const std::uint8_t* getSingleCompositeKey(
    +      const std::vector<TypedValue> &key) const;
    +
    +  /**
    +   * @brief Lookup a composite key against this hash table to find a matching
    +   *        entry. Then return the aggregation state component with the
    +   *        specified index.
    +   *
    +   * @warning The key must not be null.
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call and as long as the returned pointer may be
    +   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
    +   *          forEach(), and forEachCompositeKey() are safe.
    +   *
    +   * @param key The key to look up.
    +   * @param index The index of the target aggregation state among those states
    +   *        mapped to \p key.
    +   * @return The aggregation state of the specified index if a matching key is
    +   *         found. Otherwise, return NULL.
    +   **/
    +  inline const std::uint8_t* getSingleCompositeKey(
    +      const std::vector<TypedValue> &key,
    +      const std::size_t index) const;
    +
    +  /**
    +   * @brief Apply a functor to each (key, value) pair in this hash table.
    +   *
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call and as long as the returned pointer may be
    +   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
    +   *          forEach(), and forEachCompositeKey() are safe.
    +   *
    +   * @param functor A pointer to a functor, which should provide a call operator
    +   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
    +   *        The call operator will be invoked once on each key, value pair in
    +   *        this hash table.
    +   * @return The number of key-value pairs visited.
    +   **/
    +  template <typename FunctorT>
    +  inline std::size_t forEach(FunctorT *functor) const;
    +
    +  /**
    +   * @brief Apply a functor to each (key, aggregation state) pair in this hash
    +   *        table, where the aggregation state is retrieved from the value
    +   *        that maps to the corresponding key with the specified index.
    +   *
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call and as long as the returned pointer may be
    +   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
    +   *          forEach(), and forEachCompositeKey() are safe.
    +   *
    +   * @param functor A pointer to a functor, which should provide a call operator
    +   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
    +   *        The call operator will be invoked once on each (key, aggregation state)
    +   *        pair in this hash table.
    +   * @param index The index of the target aggregation state among those states
    +   *        mapped to \p key.
    +   * @return The number of key-value pairs visited.
    +   **/
    +  template <typename FunctorT>
    +  inline std::size_t forEach(FunctorT *functor, const int index) const;
    +
    +  /**
    +   * @brief Apply a functor to each key, value pair in this hash table.
    +   *        Composite key version.
    +   *
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call and as long as the returned pointer may be
    +   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
    +   *          forEach(), and forEachCompositeKey() are safe.
    +   *
    +   * @param functor A pointer to a functor, which should provide a call operator
    +   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
    +   *        The call operator will be invoked once on each key, value pair in
    +   *        this hash table.
    +   * @return The number of key-value pairs visited.
    +   **/
    +  template <typename FunctorT>
    +  inline std::size_t forEachCompositeKey(FunctorT *functor) const;
    +
    +  /**
    +   * @brief Apply a functor to each (key, aggregation state) pair in this hash
    +   *        table, where the aggregation state is retrieved from the value
    +   *        that maps to the corresponding key with the specified index.
    +   *        Composite key version.
    +   *
    +   * @warning This method assumes that no concurrent calls to
    +   *          upsertCompositeKey() or upsertValueAccessorCompositeKey() are
    +   *          taking place (i.e. that this HashTable is immutable for the
    +   *          duration of the call and as long as the returned pointer may be
    +   *          dereferenced). Concurrent calls to getSingleCompositeKey(),
    +   *          forEach(), and forEachCompositeKey() are safe.
    +   *
    +   * @param functor A pointer to a functor, which should provide a call operator
    +   *        which takes 2 arguments: const TypedValue&, const std::uint8_t*.
    +   *        The call operator will be invoked once on each (key, aggregation state)
    +   *        pair in this hash table.
    +   * @param index The index of the target aggregation state among those states
    +   *        mapped to \p key.
    +   * @return The number of key-value pairs visited.
    +   **/
    +  template <typename FunctorT>
    +  inline std::size_t forEachCompositeKey(FunctorT *functor,
    +                                         const std::size_t index) const;
    +
    + private:
    +  void resize(const std::size_t extra_buckets,
    +              const std::size_t extra_variable_storage,
    +              const std::size_t retry_num = 0);
    +
    +  inline std::size_t calculateVariableLengthCompositeKeyCopySize(
    +      const std::vector<TypedValue> &key) const {
    +    std::size_t total = 0;
    +    for (std::vector<TypedValue>::size_type idx = 0; idx < key.size(); ++idx) {
    +      if (!(*key_inline_)[idx]) {
    +        total += key[idx].getDataSize();
    +      }
    +    }
    +    return total;
    +  }
    +
    +  inline bool getNextEntry(TypedValue *key,
    +                           const std::uint8_t **value,
    +                           std::size_t *entry_num) const;
    +
    +  inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
    +                                       const std::uint8_t **value,
    +                                       std::size_t *entry_num) const;
    +
    +  inline std::uint8_t* upsertCompositeKeyInternal(
    +      const std::vector<TypedValue> &key,
    +      const std::size_t variable_key_size);
    +
    +  template <bool use_two_accessors>
    +  inline bool upsertValueAccessorCompositeKeyInternal(
    +      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
    +      const std::vector<MultiSourceAttributeId> &key_ids,
    +      ValueAccessor *base_accessor,
    +      ColumnVectorsValueAccessor *derived_accessor);
    +
    +  // Generate a hash for a composite key by hashing each component of 'key' and
    +  // mixing their bits with CombineHashes().
    +  inline std::size_t hashCompositeKey(const std::vector<TypedValue> &key) const;
    +
    +  // Set information about which key components are stored inline. This usually
    +  // comes from a HashTableKeyManager, and is set by the constructor of a
    +  // subclass of HashTable.
    +  inline void setKeyInline(const std::vector<bool> *key_inline) {
    +    scalar_key_inline_ = key_inline->front();
    +    key_inline_ = key_inline;
    +  }
    +
    +  inline static std::size_t ComputeTotalPayloadSize(
    +      const std::vector<AggregationHandle *> &handles) {
    +    std::size_t total_payload_size = sizeof(SpinMutex);
    +    for (const auto *handle : handles) {
    +      total_payload_size += handle->getPayloadSize();
    +    }
    +    return total_payload_size;
    +  }
    +
    +  // Assign '*key_vector' with the attribute values specified by 'key_ids' at
    +  // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
    +  // and returns true if any of the values is null, otherwise returns false.
    +  template <bool use_two_accessors,
    +            bool check_for_null_keys,
    +            typename ValueAccessorT>
    +  inline static bool GetCompositeKeyFromValueAccessor(
    +      const std::vector<MultiSourceAttributeId> &key_ids,
    +      const ValueAccessorT *accessor,
    +      const ColumnVectorsValueAccessor *derived_accessor,
    +      std::vector<TypedValue> *key_vector) {
    +    for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
    +      const MultiSourceAttributeId &key_id = key_ids[key_idx];
    +      if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
    +        (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
    +      } else {
    +        (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
    +      }
    +      if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
    +        return true;
    +      }
    +    }
    +    return false;
    +  }
    +
    +  struct Header {
    +    std::size_t num_slots;
    +    std::size_t num_buckets;
    +    alignas(kCacheLineBytes) std::atomic<std::size_t> buckets_allocated;
    +    alignas(kCacheLineBytes)
    +        std::atomic<std::size_t> variable_length_bytes_allocated;
    +  };
    +
    +  // Type(s) of keys.
    +  const std::vector<const Type *> key_types_;
    +
    +  // Information about whether key components are stored inline or in a
    +  // separate variable-length storage region. This is usually determined by a
    +  // HashTableKeyManager and set by calling setKeyInline().
    +  bool scalar_key_inline_;
    +  const std::vector<bool> *key_inline_;
    +
    +  const std::size_t num_handles_;
    +  const std::vector<AggregationHandle *> handles_;
    +
    +  std::size_t total_payload_size_;
    +  std::vector<std::size_t> payload_offsets_;
    +  std::uint8_t *init_payload_;
    +
    +  StorageManager *storage_manager_;
    +  MutableBlobReference blob_;
    +
    +  // Locked in shared mode for most operations, exclusive mode during resize.
    +  // Not locked at all for non-resizable HashTables.
    +  alignas(kCacheLineBytes) SpinSharedMutex<true> resize_shared_mutex_;
    +
    +  std::size_t kBucketAlignment;
    +
    +  // Value's offset in a bucket is the first alignof(ValueT) boundary after the
    +  // next pointer and hash code.
    +  std::size_t kValueOffset;
    +
    +  // Round bucket size up to a multiple of kBucketAlignment.
    +  inline std::size_t ComputeBucketSize(const std::size_t fixed_key_size) {
    +    return (((kValueOffset + this->total_payload_size_ + fixed_key_size - 1) /
    +             kBucketAlignment) +
    +            1) *
    +           kBucketAlignment;
    +  }
    +
    +  // Attempt to find an empty bucket to insert 'hash_code' into, starting after
    +  // '*bucket' in the chain (or, if '*bucket' is NULL, starting from the slot
    +  // array). Returns true and stores SIZE_T_MAX in '*pending_chain_ptr' if an
    +  // empty bucket is found. Returns false if 'allow_duplicate_keys' is false
    +  // and a hash collision is found (caller should then check whether there is a
    +  // genuine key collision or the hash collision is spurious). Returns false
    +  // and sets '*bucket' to NULL if there are no more empty buckets in the hash
    +  // table. If 'variable_key_allocation_required' is nonzero, this method will
    +  // attempt to allocate storage for a variable-length key BEFORE allocating a
    +  // bucket, so that no bucket number below 'header_->num_buckets' is ever
    +  // deallocated after being allocated.
    +  inline bool locateBucketForInsertion(
    +      const std::size_t hash_code,
    +      const std::size_t variable_key_allocation_required,
    +      void **bucket,
    +      std::atomic<std::size_t> **pending_chain_ptr,
    +      std::size_t *pending_chain_ptr_finish_value);
    +
    +  // Write a scalar 'key' and its 'hash_code' into the '*bucket', which was
    +  // found by locateBucketForInsertion(). Assumes that storage for a
    +  // variable-length key copy (if any) was already allocated by a successful
    +  // call to allocateVariableLengthKeyStorage().
    +  inline void writeScalarKeyToBucket(
    +      const TypedValue &key,
    +      const std::size_t hash_code,
    +      void *bucket);
    +
    +  // Write a composite 'key' and its 'hash_code' into the '*bucket', which was
    +  // found by locateBucketForInsertion(). Assumes that storage for
    +  // variable-length key copies (if any) was already allocated by a successful
    +  // call to allocateVariableLengthKeyStorage().
    +  inline void writeCompositeKeyToBucket(
    +      const std::vector<TypedValue> &key,
    +      const std::size_t hash_code,
    +      void *bucket);
    +
    +  // Determine whether it is actually necessary to resize this hash table.
    +  // Checks that there is at least one unallocated bucket, and that there is
    +  // at least 'extra_variable_storage' bytes of variable-length storage free.
    +  inline bool isFull(const std::size_t extra_variable_storage) const;
    +
    +  // Helper object to manage key storage.
    +  HashTableKeyManager<false, true> key_manager_;
    +
    +  // In-memory structure is as follows:
    +  //   - SeparateChainingHashTable::Header
    +  //   - Array of slots, interpreted as follows:
    +  //       - 0 = Points to nothing (empty)
    +  //       - SIZE_T_MAX = Pending (some thread is starting a chain from this
    +  //         slot and will overwrite it soon)
    +  //       - Anything else = The number of the first bucket in the chain for
    +  //         this slot PLUS ONE (i.e. subtract one to get the actual bucket
    +  //         number).
    +  //   - Array of buckets, each of which is:
    +  //       - atomic size_t "next" pointer, interpreted the same as slots above.
    +  //       - size_t hash value
    +  //       - possibly some unused bytes as needed so that ValueT's alignment
    +  //         requirement is met
    +  //       - ValueT value slot
    +  //       - fixed-length key storage (which may include pointers to external
    +  //         memory or offsets of variable length keys stored within this hash
    +  //         table)
    +  //       - possibly some additional unused bytes so that bucket size is a
    +  //         multiple of both alignof(std::atomic<std::size_t>) and
    +  //         alignof(ValueT)
    +  //   - Variable-length key storage region (referenced by offsets stored in
    +  //     fixed-length keys).
    +  Header *header_;
    +
    +  std::atomic<std::size_t> *slots_;
    +  void *buckets_;
    +  const std::size_t bucket_size_;
    +
    +  DISALLOW_COPY_AND_ASSIGN(PackedPayloadHashTable);
    +};
    +
    +/** @} */
    +
    +// ----------------------------------------------------------------------------
    +// Implementations of template class methods follow.
    +
    +class HashTableMerger {
    + public:
    +  /**
    +   * @brief Constructor
    +   *
    +   * @param handle The Aggregation handle being used.
    +   * @param destination_hash_table The destination hash table to which other
    +   *        hash tables will be merged.
    +   **/
    +  explicit HashTableMerger(PackedPayloadHashTable *destination_hash_table)
    +      : destination_hash_table_(destination_hash_table) {}
    +
    +  /**
    +   * @brief The operator for the functor.
    +   *
    +   * @param group_by_key The group by key being merged.
    +   * @param source_state The aggregation state for the given key in the source
    +   *        aggregation hash table.
    +   **/
    +  inline void operator()(const std::vector<TypedValue> &group_by_key,
    +                         const std::uint8_t *source_state) {
    +    destination_hash_table_->upsertCompositeKey(group_by_key, source_state);
    +  }
    +
    + private:
    +  PackedPayloadHashTable *destination_hash_table_;
    +
    +  DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
    +};
    +
    +inline std::size_t PackedPayloadHashTable::hashCompositeKey(
    +    const std::vector<TypedValue> &key) const {
    +  DEBUG_ASSERT(!key.empty());
    +  DEBUG_ASSERT(key.size() == key_types_.size());
    +  std::size_t hash = key.front().getHash();
    +  for (std::vector<TypedValue>::const_iterator key_it = key.begin() + 1;
    +       key_it != key.end();
    +       ++key_it) {
    +    hash = CombineHashes(hash, key_it->getHash());
    +  }
    +  return hash;
    +}
    +
    +inline bool PackedPayloadHashTable::getNextEntry(TypedValue *key,
    +                                                 const std::uint8_t **value,
    +                                                 std::size_t *entry_num) const {
    +  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
    +    const char *bucket =
    +        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
    +    *key = key_manager_.getKeyComponentTyped(bucket, 0);
    +    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
    +    ++(*entry_num);
    +    return true;
    +  } else {
    +    return false;
    +  }
    --- End diff --
    
    Minor, but we could flip the condition:
    
    ```
    
      if (*entry_num >= header_->buckets_allocated.load(std::memory_order_relaxed)) {
        return false;
      }
    
      const char *bucket =
          static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
      *key = key_manager_.getKeyComponentTyped(bucket, 0);
      *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
      ++(*entry_num);
      return true;
    ```
    
    Similarly below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505589
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    +  if (is_aggregate_collision_free_) {
    +    static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->initialize(partition_id);
    +  } else {
    +    LOG(FATAL) << "AggregationOperationState::initializeState() "
    +               << "is not supported by this aggregation";
    +  }
    +}
     
    +void AggregationOperationState::aggregateBlock(const block_id input_block,
    +                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
       BlockReference block(
           storage_manager_->getBlock(input_block, input_relation_));
    +  const auto &tuple_store = block->getTupleStorageSubBlock();
    +  std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
    +  std::unique_ptr<ValueAccessor> shared_accessor;
    +  ValueAccessor *accessor = base_accessor.get();
     
    +  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
    +  // as the existence map for the tuples.
       std::unique_ptr<TupleIdSequence> matches;
       if (predicate_ != nullptr) {
    -    std::unique_ptr<ValueAccessor> accessor(
    -        block->getTupleStorageSubBlock().createValueAccessor());
    -    matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
    +    matches.reset(block->getMatchesForPredicate(predicate_.get()));
    +    shared_accessor.reset(
    +        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
    +    accessor = shared_accessor.get();
    +  }
    +  if (lip_filter_adaptive_prober != nullptr) {
    +    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
    +    shared_accessor.reset(
    +        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
    +    accessor = shared_accessor.get();
       }
     
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -    // If all arguments are attributes of the input relation, elide a copy.
    -    if (!arguments_as_attributes_[agg_idx].empty()) {
    -      local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
    +  std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
    +  if (!non_trivial_expressions_.empty()) {
    +    non_trivial_results.reset(new ColumnVectorsValueAccessor());
    +    SubBlocksReference sub_blocks_ref(tuple_store,
    +                                      block->getIndices(),
    +                                      block->getIndicesConsistent());
    +    for (const auto &expression : non_trivial_expressions_) {
    +      non_trivial_results->addColumn(
    +          expression->getAllValues(accessor, &sub_blocks_ref));
         }
    -#endif
    +  }
    +
    +  accessor->beginIterationVirtual();
    +
    +  ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
    +  if (group_by_key_ids_.empty()) {
    +    aggregateBlockSingleState(accessor_mux);
    +  } else {
    +    aggregateBlockHashTable(accessor_mux);
    +  }
    +}
    +
    +void AggregationOperationState::aggregateBlockSingleState(
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  // Aggregate per-block state for each aggregate.
    +  std::vector<std::unique_ptr<AggregationState>> local_state;
    +
    +  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    +    const auto &argument_ids = argument_ids_[agg_idx];
    +    const auto &handle = handles_[agg_idx];
    +
    +    AggregationState *state = nullptr;
         if (is_distinct_[agg_idx]) {
    -      // Call StorageBlock::aggregateDistinct() to put the arguments as keys
    -      // directly into the (threadsafe) shared global distinctify HashTable
    -      // for this aggregate.
    -      block->aggregateDistinct(*handles_[agg_idx],
    -                               arguments_[agg_idx],
    -                               local_arguments_as_attributes,
    -                               {}, /* group_by */
    -                               matches.get(),
    -                               distinctify_hashtables_[agg_idx].get(),
    -                               nullptr /* reuse_group_by_vectors */);
    -      local_state.emplace_back(nullptr);
    +      handle->insertValueAccessorIntoDistinctifyHashTable(
    +          argument_ids,
    +          {},
    +          accessor_mux,
    +          distinctify_hashtables_[agg_idx].get());
         } else {
    -      // Call StorageBlock::aggregate() to actually do the aggregation.
    -      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
    -                                                arguments_[agg_idx],
    -                                                local_arguments_as_attributes,
    -                                                matches.get()));
    +      if (argument_ids.empty()) {
    +        // Special case. This is a nullary aggregate (i.e. COUNT(*)).
    +        ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +        DCHECK(base_accessor != nullptr);
    +        state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
    +      } else {
    +        // Have the AggregationHandle actually do the aggregation.
    +        state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
    +      }
         }
    +    local_state.emplace_back(state);
       }
     
       // Merge per-block aggregation states back with global state.
       mergeSingleState(local_state);
     }
     
    -void AggregationOperationState::aggregateBlockHashTable(
    -    const block_id input_block,
    -    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  BlockReference block(
    -      storage_manager_->getBlock(input_block, input_relation_));
    -
    -  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
    -  // as the existence map for the tuples.
    -  std::unique_ptr<TupleIdSequence> matches;
    -  if (predicate_ != nullptr) {
    -    matches.reset(block->getMatchesForPredicate(predicate_.get()));
    -  }
    -  if (lip_filter_adaptive_prober != nullptr) {
    -    std::unique_ptr<ValueAccessor> accessor(
    -        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
    -    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
    -  }
    -
    -  // This holds values of all the GROUP BY attributes so that the can be reused
    -  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
    -  // GROUP BY expressions once).
    -  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
    -
    +void AggregationOperationState::mergeSingleState(
    +    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    +  DEBUG_ASSERT(local_state.size() == single_states_.size());
    --- End diff --
    
    Use `DCHECK_EQ` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99510443
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -556,80 +655,83 @@ void AggregationOperationState::finalizeSingleState(
       output_destination->insertTuple(Tuple(std::move(attribute_values)));
     }
     
    -void AggregationOperationState::mergeGroupByHashTables(
    -    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
    -  HashTableMergerFast merger(dst);
    -  (static_cast<FastHashTable<true, false, true, false> *>(src))
    -      ->forEachCompositeKeyFast(&merger);
    +void AggregationOperationState::finalizeHashTable(
    +    const std::size_t partition_id,
    +    InsertDestination *output_destination) {
    +  if (is_aggregate_collision_free_) {
    +    finalizeHashTableImplCollisionFree(partition_id, output_destination);
    +  } else if (is_aggregate_partitioned_) {
    +    finalizeHashTableImplPartitioned(partition_id, output_destination);
    +  } else {
    +    DCHECK_EQ(0u, partition_id);
    +    finalizeHashTableImplThreadPrivate(output_destination);
    +  }
     }
     
    -void AggregationOperationState::finalizeHashTable(
    +void AggregationOperationState::finalizeHashTableImplCollisionFree(
    +    const std::size_t partition_id,
    +    InsertDestination *output_destination) {
    +  std::vector<std::unique_ptr<ColumnVector>> final_values;
    +  CollisionFreeVectorTable *hash_table =
    +      static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
    +
    +  const std::size_t max_length =
    +      hash_table->getNumTuplesInFinalizationPartition(partition_id);
    +  ColumnVectorsValueAccessor complete_result;
    +
    +  DCHECK_EQ(1u, group_by_types_.size());
    +  const Type *key_type = group_by_types_.front();
    +  DCHECK(NativeColumnVector::UsableForType(*key_type));
    +
    +  std::unique_ptr<NativeColumnVector> key_cv(
    +      new NativeColumnVector(*key_type, max_length));
    +  hash_table->finalizeKey(partition_id, key_cv.get());
    +  complete_result.addColumn(key_cv.release());
    +
    +  for (std::size_t i = 0; i < handles_.size(); ++i) {
    +    const Type *result_type = handles_[i]->getResultType();
    +    DCHECK(NativeColumnVector::UsableForType(*result_type));
    +
    +    std::unique_ptr<NativeColumnVector> result_cv(
    +        new NativeColumnVector(*result_type, max_length));
    --- End diff --
    
    Please avoid using `new` and instead use `std::make_unique`.
    
    `auto result_cv = std::make_unique<NativeColumnVector>(*result_type, max_length);`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-quickstep/pull/179


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99883469
  
    --- Diff: query_optimizer/ExecutionGenerator.hpp ---
    @@ -427,7 +445,7 @@ class ExecutionGenerator {
       /**
        * @brief The cost model to use for estimating aggregation hash table size.
        */
    -  std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
    +  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
    --- End diff --
    
    We need some extra analysis done in `StarSchemaSimpleCostModel`. I will refactor the analysis part out of `StarSchemaSimpleCostModel` into `AnalysisUtil` in a follow-up PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99517170
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    +    FATAL_ERROR(
    +        "Storage requirement for SeparateChainingHashTable "
    +        "exceeds maximum allocation size.");
    +  }
    +
    +  // Get a StorageBlob to hold the hash table.
    +  const block_id blob_id =
    +      this->storage_manager_->createBlob(num_storage_slots);
    +  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
    +
    +  void *aligned_memory_start = this->blob_->getMemoryMutable();
    +  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
    +  if (align(alignof(Header),
    --- End diff --
    
    Refactor using `CHECK`.
    
    Similarly below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99504376
  
    --- Diff: query_optimizer/ExecutionGenerator.hpp ---
    @@ -427,7 +445,7 @@ class ExecutionGenerator {
       /**
        * @brief The cost model to use for estimating aggregation hash table size.
        */
    -  std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
    +  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
    --- End diff --
    
    I think no needs for this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505552
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    +  if (is_aggregate_collision_free_) {
    +    static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->initialize(partition_id);
    +  } else {
    +    LOG(FATAL) << "AggregationOperationState::initializeState() "
    --- End diff --
    
    How about `LOG(WARNING)` instead of crashing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    There is a `CMakeLists` to be updated -- do not merge at this moment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505207
  
    --- Diff: relational_operators/WorkOrderFactory.cpp ---
    @@ -186,6 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index;
           return new FinalizeAggregationWorkOrder(
               proto.query_id(),
    +          0uL,
    --- End diff --
    
    partition_id?
    
    Please add a `TODO: refactor using partition_id from proto, like BuildHashWorkOrder`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99882605
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    +    return false;
    +  }
    +
    +  // We need to know the exact min/max stats of the group-by key.
    +  // So it must be a CatalogAttribute (but not an expression).
    +  E::AttributeReferencePtr group_by_key_attr;
    +  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
    +  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
    +    return false;
    +  }
    +
    +  bool min_value_stat_is_exact;
    +  bool max_value_stat_is_exact;
    +  const TypedValue min_value =
    +      cost_model_for_aggregation_->findMinValueStat(
    +          aggregate, group_by_key_attr, &min_value_stat_is_exact);
    +  const TypedValue max_value =
    +      cost_model_for_aggregation_->findMaxValueStat(
    +          aggregate, group_by_key_attr, &max_value_stat_is_exact);
    +  if (min_value.isNull() || max_value.isNull() ||
    +      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
    +    return false;
    +  }
    +
    +  std::int64_t min_cpp_value;
    +  std::int64_t max_cpp_value;
    +  switch (group_by_key_attr->getValueType().getTypeID()) {
    +    case TypeID::kInt: {
    +      min_cpp_value = min_value.getLiteral<int>();
    +      max_cpp_value = max_value.getLiteral<int>();
    +      break;
    +    }
    +    case TypeID::kLong: {
    +      min_cpp_value = min_value.getLiteral<std::int64_t>();
    +      max_cpp_value = max_value.getLiteral<std::int64_t>();
    +      break;
    +    }
    +    default:
    +      return false;
    +  }
    +
    +  // TODO(jianqiao):
    +  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
    +  // 2. Reason about the table size bound (e.g. by checking memory size) instead
    +  //    of hardcoding it as a gflag.
    +  if (min_cpp_value < 0 ||
    +      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
    +      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
    +    return false;
    +  }
    +
    +  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
    +    const E::AggregateFunctionPtr agg_func =
    +        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
    +
    +    if (agg_func->is_distinct()) {
    +      return false;
    +    }
    +
    +    // TODO(jianqiao): Support AggregationID::AVG.
    +    switch (agg_func->getAggregate().getAggregationID()) {
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99925160
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    --- End diff --
    
    May two segmented hash tables have the same hashed key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505348
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
         const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
         StorageManager *storage_manager)
         : input_relation_(input_relation),
    -      is_aggregate_partitioned_(checkAggregatePartitioned(
    -          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
    +      is_aggregate_collision_free_(false),
    +      is_aggregate_partitioned_(false),
    --- End diff --
    
    FYI, in the hash join, we have a fixed number of hash tables, and treat no partition case as `num_partitions` is one.
    
    I think we could do similar for the aggregation, and pass the fixed `num_partitions` from the optimizer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    Currently there is a gflag for setting the range upbound:
    https://github.com/apache/incubator-quickstep/pull/179/files#diff-3d4b5df01ed8edbe255e096eefbfc342R440


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99922836
  
    --- Diff: relational_operators/InitializeAggregationOperator.cpp ---
    @@ -0,0 +1,72 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "relational_operators/InitializeAggregationOperator.hpp"
    +
    +#include <cstddef>
    +
    +#include "query_execution/QueryContext.hpp"
    +#include "query_execution/WorkOrderProtosContainer.hpp"
    +#include "query_execution/WorkOrdersContainer.hpp"
    +#include "relational_operators/WorkOrder.pb.h"
    +#include "storage/AggregationOperationState.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace quickstep {
    +
    +bool InitializeAggregationOperator::getAllWorkOrders(
    +    WorkOrdersContainer *container,
    +    QueryContext *query_context,
    +    StorageManager *storage_manager,
    +    const tmb::client_id scheduler_client_id,
    +    tmb::MessageBus *bus) {
    +  if (!started_) {
    +    AggregationOperationState *agg_state =
    +        query_context->getAggregationState(aggr_state_index_);
    +    DCHECK(agg_state != nullptr);
    +
    +    for (std::size_t part_id = 0;
    +         part_id < agg_state->getNumInitializationPartitions();
    +         ++part_id) {
    +      container->addNormalWorkOrder(
    +          new InitializeAggregationWorkOrder(query_id_,
    +                                             part_id,
    +                                             agg_state),
    +          op_index_);
    +    }
    +    started_ = true;
    +  }
    +  return true;
    +}
    +
    +// TODO(quickstep-team) : Think about how the number of partitions could be
    +// accessed in this function. Until then, we can't use partitioned aggregation
    +// initialization with the distributed version.
    --- End diff --
    
    The "partition" here is kind of inner-table partitioning that is not related to the partitioning of storage blocks. To get the "number of partitions" here, we need the "AggregationOperationState" object, which is not available at this moment.
    
    One solution is to calculate the # of partitions value before creating the `AggregationOperationState` object. That requires quite some refactoring. We can see if we need to do that later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    Updated, and tested locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505477
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -269,7 +269,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
             proto.group_by_expressions(group_by_idx), database));
       }
     
    -  unique_ptr<Predicate> predicate;
    +  std::unique_ptr<Predicate> predicate;
    --- End diff --
    
    No need for this. We have `using std::unique_ptr` above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99504160
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    --- End diff --
    
    For multiple small group-by keys, I think we could create a multi-dimension array for the same goal as the single key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99927590
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    --- End diff --
    
    No. Every segment contains an exclusive subset of buckets. Every bucket corresponds to a unique key.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99884619
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -1495,9 +1607,28 @@ void ExecutionGenerator::convertAggregate(
       }
     
       if (!group_by_types.empty()) {
    -    // Right now, only SeparateChaining is supported.
    -    aggr_state_proto->set_hash_table_impl_type(
    -        serialization::HashTableImplType::SEPARATE_CHAINING);
    +    const std::size_t estimated_num_groups =
    +        cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
    +
    +    std::size_t max_num_groups;
    +    const bool can_use_collision_free_aggregation =
    +        canUseCollisionFreeAggregation(physical_plan,
    +                                       estimated_num_groups,
    +                                       &max_num_groups);
    +
    +    if (can_use_collision_free_aggregation) {
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99504279
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    +    return false;
    +  }
    +
    +  // We need to know the exact min/max stats of the group-by key.
    +  // So it must be a CatalogAttribute (but not an expression).
    +  E::AttributeReferencePtr group_by_key_attr;
    +  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
    +  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
    +    return false;
    +  }
    +
    +  bool min_value_stat_is_exact;
    +  bool max_value_stat_is_exact;
    +  const TypedValue min_value =
    +      cost_model_for_aggregation_->findMinValueStat(
    +          aggregate, group_by_key_attr, &min_value_stat_is_exact);
    +  const TypedValue max_value =
    +      cost_model_for_aggregation_->findMaxValueStat(
    +          aggregate, group_by_key_attr, &max_value_stat_is_exact);
    +  if (min_value.isNull() || max_value.isNull() ||
    +      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
    +    return false;
    +  }
    +
    +  std::int64_t min_cpp_value;
    +  std::int64_t max_cpp_value;
    +  switch (group_by_key_attr->getValueType().getTypeID()) {
    +    case TypeID::kInt: {
    +      min_cpp_value = min_value.getLiteral<int>();
    +      max_cpp_value = max_value.getLiteral<int>();
    +      break;
    +    }
    +    case TypeID::kLong: {
    +      min_cpp_value = min_value.getLiteral<std::int64_t>();
    +      max_cpp_value = max_value.getLiteral<std::int64_t>();
    +      break;
    +    }
    +    default:
    +      return false;
    +  }
    +
    +  // TODO(jianqiao):
    +  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
    +  // 2. Reason about the table size bound (e.g. by checking memory size) instead
    +  //    of hardcoding it as a gflag.
    +  if (min_cpp_value < 0 ||
    +      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
    +      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
    +    return false;
    +  }
    +
    +  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
    +    const E::AggregateFunctionPtr agg_func =
    +        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
    +
    +    if (agg_func->is_distinct()) {
    +      return false;
    +    }
    +
    +    // TODO(jianqiao): Support AggregationID::AVG.
    +    switch (agg_func->getAggregate().getAggregationID()) {
    --- End diff --
    
    Refactor using `QUICKSTEP_EQUALS_ANY_CONSTANT` defined in `utility/EqualsAnyConstant.hpp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by pateljm <gi...@git.apache.org>.
Github user pateljm commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    Very impressive algorithmic change @jianqiao !!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99930761
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -556,80 +655,83 @@ void AggregationOperationState::finalizeSingleState(
       output_destination->insertTuple(Tuple(std::move(attribute_values)));
     }
     
    -void AggregationOperationState::mergeGroupByHashTables(
    -    AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
    -  HashTableMergerFast merger(dst);
    -  (static_cast<FastHashTable<true, false, true, false> *>(src))
    -      ->forEachCompositeKeyFast(&merger);
    +void AggregationOperationState::finalizeHashTable(
    +    const std::size_t partition_id,
    +    InsertDestination *output_destination) {
    +  if (is_aggregate_collision_free_) {
    +    finalizeHashTableImplCollisionFree(partition_id, output_destination);
    +  } else if (is_aggregate_partitioned_) {
    +    finalizeHashTableImplPartitioned(partition_id, output_destination);
    +  } else {
    +    DCHECK_EQ(0u, partition_id);
    +    finalizeHashTableImplThreadPrivate(output_destination);
    +  }
     }
     
    -void AggregationOperationState::finalizeHashTable(
    +void AggregationOperationState::finalizeHashTableImplCollisionFree(
    +    const std::size_t partition_id,
    +    InsertDestination *output_destination) {
    +  std::vector<std::unique_ptr<ColumnVector>> final_values;
    +  CollisionFreeVectorTable *hash_table =
    +      static_cast<CollisionFreeVectorTable *>(collision_free_hashtable_.get());
    +
    +  const std::size_t max_length =
    +      hash_table->getNumTuplesInFinalizationPartition(partition_id);
    +  ColumnVectorsValueAccessor complete_result;
    +
    +  DCHECK_EQ(1u, group_by_types_.size());
    +  const Type *key_type = group_by_types_.front();
    +  DCHECK(NativeColumnVector::UsableForType(*key_type));
    +
    +  std::unique_ptr<NativeColumnVector> key_cv(
    +      new NativeColumnVector(*key_type, max_length));
    +  hash_table->finalizeKey(partition_id, key_cv.get());
    +  complete_result.addColumn(key_cv.release());
    +
    +  for (std::size_t i = 0; i < handles_.size(); ++i) {
    +    const Type *result_type = handles_[i]->getResultType();
    +    DCHECK(NativeColumnVector::UsableForType(*result_type));
    +
    +    std::unique_ptr<NativeColumnVector> result_cv(
    +        new NativeColumnVector(*result_type, max_length));
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    Hi @jianqiao,
    
    A quick question on Feature 1 using a vector-based aggregation: for a group-by w/ a known bounded range (i.e., the min and max value), do we always choose this approach over the hash-based, or depending on the range size (i.e., if the range is too wide, we may fall back to the hash-based)? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99504206
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    +    return false;
    +  }
    +
    +  // We need to know the exact min/max stats of the group-by key.
    +  // So it must be a CatalogAttribute (but not an expression).
    +  E::AttributeReferencePtr group_by_key_attr;
    +  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
    +  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
    +    return false;
    +  }
    +
    +  bool min_value_stat_is_exact;
    +  bool max_value_stat_is_exact;
    +  const TypedValue min_value =
    +      cost_model_for_aggregation_->findMinValueStat(
    +          aggregate, group_by_key_attr, &min_value_stat_is_exact);
    +  const TypedValue max_value =
    +      cost_model_for_aggregation_->findMaxValueStat(
    +          aggregate, group_by_key_attr, &max_value_stat_is_exact);
    +  if (min_value.isNull() || max_value.isNull() ||
    +      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
    +    return false;
    +  }
    +
    +  std::int64_t min_cpp_value;
    +  std::int64_t max_cpp_value;
    +  switch (group_by_key_attr->getValueType().getTypeID()) {
    +    case TypeID::kInt: {
    +      min_cpp_value = min_value.getLiteral<int>();
    +      max_cpp_value = max_value.getLiteral<int>();
    +      break;
    +    }
    +    case TypeID::kLong: {
    +      min_cpp_value = min_value.getLiteral<std::int64_t>();
    +      max_cpp_value = max_value.getLiteral<std::int64_t>();
    +      break;
    +    }
    +    default:
    +      return false;
    --- End diff --
    
    Is the reason of supporting only such types about the overflow?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99510612
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
         const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
         StorageManager *storage_manager)
         : input_relation_(input_relation),
    -      is_aggregate_partitioned_(checkAggregatePartitioned(
    -          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
    +      is_aggregate_collision_free_(false),
    +      is_aggregate_partitioned_(false),
           predicate_(predicate),
    -      group_by_list_(std::move(group_by)),
    -      arguments_(std::move(arguments)),
           is_distinct_(std::move(is_distinct)),
           storage_manager_(storage_manager) {
    +  if (!group_by.empty()) {
    +    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
    +      is_aggregate_collision_free_ = true;
    +    } else {
    +      is_aggregate_partitioned_ = checkAggregatePartitioned(
    +          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
    +    }
    +  }
    +
       // Sanity checks: each aggregate has a corresponding list of arguments.
    -  DCHECK(aggregate_functions.size() == arguments_.size());
    +  DCHECK(aggregate_functions.size() == arguments.size());
     
       // Get the types of GROUP BY expressions for creating HashTables below.
    -  std::vector<const Type *> group_by_types;
    -  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
    -    group_by_types.emplace_back(&group_by_element->getType());
    +  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    group_by_types_.emplace_back(&group_by_element->getType());
    +  }
    +
    +  // Prepare group-by key ids and non-trivial expressions.
    +  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    const attribute_id attr_id =
    +        group_by_element->getAttributeIdForValueAccessor();
    +    if (attr_id != kInvalidAttributeID) {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
    +    } else {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
    +                                     non_trivial_expressions_.size());
    +      non_trivial_expressions_.emplace_back(group_by_element.release());
    +    }
       }
     
       std::vector<AggregationHandle *> group_by_handles;
    -  group_by_handles.clear();
    -
    -  if (aggregate_functions.size() == 0) {
    -    // If there is no aggregation function, then it is a distinctify operation
    -    // on the group-by expressions.
    -    DCHECK_GT(group_by_list_.size(), 0u);
    -
    -    handles_.emplace_back(new AggregationHandleDistinct());
    -    arguments_.push_back({});
    -    is_distinct_.emplace_back(false);
    -    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                     hash_table_impl_type,
    -                                                     group_by_types,
    -                                                     {1},
    -                                                     handles_,
    -                                                     storage_manager));
    -  } else {
    -    // Set up each individual aggregate in this operation.
    -    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    -        aggregate_functions.begin();
    -    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
    -        args_it = arguments_.begin();
    -    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    -    std::vector<HashTableImplType>::const_iterator
    -        distinctify_hash_table_impl_types_it =
    -            distinctify_hash_table_impl_types.begin();
    -    std::vector<std::size_t> payload_sizes;
    -    for (; agg_func_it != aggregate_functions.end();
    -         ++agg_func_it, ++args_it, ++is_distinct_it) {
    -      // Get the Types of this aggregate's arguments so that we can create an
    -      // AggregationHandle.
    -      std::vector<const Type *> argument_types;
    -      for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -        argument_types.emplace_back(&argument->getType());
    -      }
     
    -      // Sanity checks: aggregate function exists and can apply to the specified
    -      // arguments.
    -      DCHECK(*agg_func_it != nullptr);
    -      DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
    -
    -      // Have the AggregateFunction create an AggregationHandle that we can use
    -      // to do actual aggregate computation.
    -      handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    -
    -      if (!group_by_list_.empty()) {
    -        // Aggregation with GROUP BY: combined payload is partially updated in
    -        // the presence of DISTINCT.
    -        if (*is_distinct_it) {
    -          handles_.back()->blockUpdate();
    -        }
    -        group_by_handles.emplace_back(handles_.back());
    -        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
    +  // Set up each individual aggregate in this operation.
    +  std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    +      aggregate_functions.begin();
    +  std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
    +      args_it = arguments.begin();
    +  std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    +  std::vector<HashTableImplType>::const_iterator
    +      distinctify_hash_table_impl_types_it =
    +          distinctify_hash_table_impl_types.begin();
    +  for (; agg_func_it != aggregate_functions.end();
    +       ++agg_func_it, ++args_it, ++is_distinct_it) {
    +    // Get the Types of this aggregate's arguments so that we can create an
    +    // AggregationHandle.
    +    std::vector<const Type *> argument_types;
    +    for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    +      argument_types.emplace_back(&argument->getType());
    +    }
    +
    +    // Prepare argument attribute ids and non-trivial expressions.
    +    std::vector<MultiSourceAttributeId> argument_ids;
    +    for (std::unique_ptr<const Scalar> &argument : *args_it) {
    +      const attribute_id attr_id =
    +          argument->getAttributeIdForValueAccessor();
    +      if (attr_id != kInvalidAttributeID) {
    +        argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
           } else {
    -        // Aggregation without GROUP BY: create a single global state.
    -        single_states_.emplace_back(handles_.back()->createInitialState());
    -
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -        // See if all of this aggregate's arguments are attributes in the input
    -        // relation. If so, remember the attribute IDs so that we can do copy
    -        // elision when actually performing the aggregation.
    -        std::vector<attribute_id> local_arguments_as_attributes;
    -        local_arguments_as_attributes.reserve(args_it->size());
    -        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -          const attribute_id argument_id =
    -              argument->getAttributeIdForValueAccessor();
    -          if (argument_id == -1) {
    -            local_arguments_as_attributes.clear();
    -            break;
    -          } else {
    -            DCHECK_EQ(input_relation_.getID(),
    -                      argument->getRelationIdForValueAccessor());
    -            local_arguments_as_attributes.push_back(argument_id);
    -          }
    -        }
    -
    -        arguments_as_attributes_.emplace_back(
    -            std::move(local_arguments_as_attributes));
    -#endif
    +        argument_ids.emplace_back(ValueAccessorSource::kDerived,
    +                                  non_trivial_expressions_.size());
    +        non_trivial_expressions_.emplace_back(argument.release());
           }
    +    }
    +    argument_ids_.emplace_back(std::move(argument_ids));
    +
    +    // Sanity checks: aggregate function exists and can apply to the specified
    +    // arguments.
    +    DCHECK(*agg_func_it != nullptr);
    +    DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
     
    -      // Initialize the corresponding distinctify hash table if this is a
    -      // DISTINCT aggregation.
    +    // Have the AggregateFunction create an AggregationHandle that we can use
    +    // to do actual aggregate computation.
    +    handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    +
    +    if (!group_by_key_ids_.empty()) {
    +      // Aggregation with GROUP BY: combined payload is partially updated in
    +      // the presence of DISTINCT.
           if (*is_distinct_it) {
    -        std::vector<const Type *> key_types(group_by_types);
    -        key_types.insert(
    -            key_types.end(), argument_types.begin(), argument_types.end());
    -        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    -        // estimating the number of entries in the distinctify hash table.
    -        // We may estimate for each distinct aggregation an
    -        // estimated_num_distinct_keys value during query optimization, if it's
    -        // worth.
    -        distinctify_hashtables_.emplace_back(
    -            AggregationStateFastHashTableFactory::CreateResizable(
    -                *distinctify_hash_table_impl_types_it,
    -                key_types,
    -                estimated_num_entries,
    -                {0},
    -                {},
    -                storage_manager));
    -        ++distinctify_hash_table_impl_types_it;
    -      } else {
    -        distinctify_hashtables_.emplace_back(nullptr);
    +        handles_.back()->blockUpdate();
           }
    +      group_by_handles.emplace_back(handles_.back().get());
    +    } else {
    +      // Aggregation without GROUP BY: create a single global state.
    +      single_states_.emplace_back(handles_.back()->createInitialState());
         }
     
    -    if (!group_by_handles.empty()) {
    -      // Aggregation with GROUP BY: create a HashTable pool.
    -      if (!is_aggregate_partitioned_) {
    -        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                         hash_table_impl_type,
    -                                                         group_by_types,
    -                                                         payload_sizes,
    -                                                         group_by_handles,
    -                                                         storage_manager));
    -      } else {
    -        partitioned_group_by_hashtable_pool_.reset(
    -            new PartitionedHashTablePool(estimated_num_entries,
    -                                         FLAGS_num_aggregation_partitions,
    -                                         hash_table_impl_type,
    -                                         group_by_types,
    -                                         payload_sizes,
    -                                         group_by_handles,
    -                                         storage_manager));
    -      }
    +    // Initialize the corresponding distinctify hash table if this is a
    +    // DISTINCT aggregation.
    +    if (*is_distinct_it) {
    +      std::vector<const Type *> key_types(group_by_types_);
    +      key_types.insert(
    +          key_types.end(), argument_types.begin(), argument_types.end());
    +      // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    +      // estimating the number of entries in the distinctify hash table.
    +      // We need to estimate for each distinct aggregation an
    +      // estimated_num_distinct_keys value during query optimization.
    +      distinctify_hashtables_.emplace_back(
    +          AggregationStateHashTableFactory::CreateResizable(
    +              *distinctify_hash_table_impl_types_it,
    +              key_types,
    +              estimated_num_entries,
    +              {},
    +              storage_manager));
    +      ++distinctify_hash_table_impl_types_it;
    +    } else {
    +      distinctify_hashtables_.emplace_back(nullptr);
    +    }
    +  }
    +
    +  if (!group_by_key_ids_.empty()) {
    +    // Aggregation with GROUP BY: create the hash table (pool).
    +    if (is_aggregate_collision_free_) {
    +      collision_free_hashtable_.reset(
    +          AggregationStateHashTableFactory::CreateResizable(
    +              hash_table_impl_type,
    +              group_by_types_,
    +              estimated_num_entries,
    +              group_by_handles,
    +              storage_manager));
    +    } else if (is_aggregate_partitioned_) {
    +      partitioned_group_by_hashtable_pool_.reset(
    +          new PartitionedHashTablePool(estimated_num_entries,
    +                                       FLAGS_num_aggregation_partitions,
    +                                       hash_table_impl_type,
    +                                       group_by_types_,
    +                                       group_by_handles,
    +                                       storage_manager));
    +    } else {
    +      group_by_hashtable_pool_.reset(
    --- End diff --
    
    I think `group_by_hashtable_pool_` could be the special case of `partitioned_group_by_hashtable_pool_` where the number of partition is one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505506
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
    --- End diff --
    
    Could you help me understand this return value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99908259
  
    --- Diff: storage/AggregationOperationState.hpp ---
    @@ -156,6 +152,29 @@ class AggregationOperationState {
           const CatalogDatabaseLite &database);
     
       /**
    +   * @brief Get the number of partitions to be used for initializing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for initializing the aggregation.
    +   **/
    +  std::size_t getNumInitializationPartitions() const;
    +
    +  /**
    +   * @brief Get the number of partitions to be used for finalizing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for finalizing the aggregation.
    +   **/
    +  std::size_t getNumFinalizationPartitions() const;
    --- End diff --
    
    I get your point, but for the distributed version, my concern is that we have to schedule the finalize work order with a partition to the same node that executes the initialize work order. Thus, your approach makes it harder to implement in the distributed version.
    
    On the other hand, we could measure the overheads of the initialize work order, and I guess it is related small, compared to the whole aggregation processing time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99897239
  
    --- Diff: storage/AggregationOperationState.hpp ---
    @@ -156,6 +152,29 @@ class AggregationOperationState {
           const CatalogDatabaseLite &database);
     
       /**
    +   * @brief Get the number of partitions to be used for initializing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for initializing the aggregation.
    +   **/
    +  std::size_t getNumInitializationPartitions() const;
    +
    +  /**
    +   * @brief Get the number of partitions to be used for finalizing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for finalizing the aggregation.
    +   **/
    +  std::size_t getNumFinalizationPartitions() const;
    --- End diff --
    
    The number of partitions can be different for initialization and finalization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99916031
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    --- End diff --
    
    The "partition" here is somehow not related to catalog or relations. I.e. it refers to the manual segmentation of the memory / hash table entries but not the partitioning of storage blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99874032
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    +    return false;
    +  }
    +
    +  // We need to know the exact min/max stats of the group-by key.
    +  // So it must be a CatalogAttribute (but not an expression).
    +  E::AttributeReferencePtr group_by_key_attr;
    +  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
    +  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
    +    return false;
    +  }
    +
    +  bool min_value_stat_is_exact;
    +  bool max_value_stat_is_exact;
    +  const TypedValue min_value =
    +      cost_model_for_aggregation_->findMinValueStat(
    +          aggregate, group_by_key_attr, &min_value_stat_is_exact);
    +  const TypedValue max_value =
    +      cost_model_for_aggregation_->findMaxValueStat(
    +          aggregate, group_by_key_attr, &max_value_stat_is_exact);
    +  if (min_value.isNull() || max_value.isNull() ||
    +      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
    +    return false;
    +  }
    +
    +  std::int64_t min_cpp_value;
    +  std::int64_t max_cpp_value;
    +  switch (group_by_key_attr->getValueType().getTypeID()) {
    +    case TypeID::kInt: {
    +      min_cpp_value = min_value.getLiteral<int>();
    +      max_cpp_value = max_value.getLiteral<int>();
    +      break;
    +    }
    +    case TypeID::kLong: {
    +      min_cpp_value = min_value.getLiteral<std::int64_t>();
    +      max_cpp_value = max_value.getLiteral<std::int64_t>();
    +      break;
    +    }
    +    default:
    +      return false;
    --- End diff --
    
    We can support more types later. For any type/any number of group-by keys, if we can define a one-to-one mapping function that maps the keys to range-bounded integers, then this aggregation is applicable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99517208
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    +    FATAL_ERROR(
    +        "Storage requirement for SeparateChainingHashTable "
    +        "exceeds maximum allocation size.");
    +  }
    +
    +  // Get a StorageBlob to hold the hash table.
    +  const block_id blob_id =
    +      this->storage_manager_->createBlob(num_storage_slots);
    +  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
    +
    +  void *aligned_memory_start = this->blob_->getMemoryMutable();
    +  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
    +  if (align(alignof(Header),
    +            sizeof(Header),
    +            aligned_memory_start,
    +            available_memory) == nullptr) {
    +    // With current values from StorageConstants.hpp, this should be
    +    // impossible. A blob is at least 1 MB, while a Header has alignment
    +    // requirement of just kCacheLineBytes (64 bytes).
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet alignment "
    +        "requirements of SeparateChainingHashTable::Header.");
    +  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
    --- End diff --
    
    Refactor using `LOG_IF`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505112
  
    --- Diff: relational_operators/FinalizeAggregationOperator.hpp ---
    @@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
        * @note InsertWorkOrder takes ownership of \c state.
        *
        * @param query_id The ID of the query to which this operator belongs.
    +   * @param partition_id The partition ID for which the Finalize aggregation
    +   *        work order is issued.
        * @param state The AggregationState to use.
        * @param output_destination The InsertDestination to insert aggregation
        *        results.
    -   * @param part_id The partition ID for which the Finalize aggregation work
    -   *        order is issued. Ignore if aggregation is not partitioned.
        */
       FinalizeAggregationWorkOrder(const std::size_t query_id,
    +                               const std::size_t partition_id,
    --- End diff --
    
    Please replace `size_t` with `partition_id` defined in `catalog/CatalogTypedefs.hpp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    Please resync with the master branch, and I will merge it. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99924782
  
    --- Diff: relational_operators/InitializeAggregationOperator.cpp ---
    @@ -0,0 +1,72 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "relational_operators/InitializeAggregationOperator.hpp"
    +
    +#include <cstddef>
    +
    +#include "query_execution/QueryContext.hpp"
    +#include "query_execution/WorkOrderProtosContainer.hpp"
    +#include "query_execution/WorkOrdersContainer.hpp"
    +#include "relational_operators/WorkOrder.pb.h"
    +#include "storage/AggregationOperationState.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace quickstep {
    +
    +bool InitializeAggregationOperator::getAllWorkOrders(
    +    WorkOrdersContainer *container,
    +    QueryContext *query_context,
    +    StorageManager *storage_manager,
    +    const tmb::client_id scheduler_client_id,
    +    tmb::MessageBus *bus) {
    +  if (!started_) {
    +    AggregationOperationState *agg_state =
    +        query_context->getAggregationState(aggr_state_index_);
    +    DCHECK(agg_state != nullptr);
    +
    +    for (std::size_t part_id = 0;
    +         part_id < agg_state->getNumInitializationPartitions();
    +         ++part_id) {
    +      container->addNormalWorkOrder(
    +          new InitializeAggregationWorkOrder(query_id_,
    +                                             part_id,
    +                                             agg_state),
    +          op_index_);
    +    }
    +    started_ = true;
    +  }
    +  return true;
    +}
    +
    +// TODO(quickstep-team) : Think about how the number of partitions could be
    +// accessed in this function. Until then, we can't use partitioned aggregation
    +// initialization with the distributed version.
    --- End diff --
    
    So, here we don't use one or multiple catalog attribute as the partition key in the inner-table? Then, how do we do partition? Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99914989
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    +  if (is_aggregate_collision_free_) {
    +    static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->initialize(partition_id);
    +  } else {
    +    LOG(FATAL) << "AggregationOperationState::initializeState() "
    --- End diff --
    
    If the code reaches here, then the system will probably actually crash. I.e. it is like a `CHECK` but we still put the `else` branch here for further updates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99504991
  
    --- Diff: relational_operators/InitializeAggregationOperator.cpp ---
    @@ -0,0 +1,72 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "relational_operators/InitializeAggregationOperator.hpp"
    +
    +#include <cstddef>
    +
    +#include "query_execution/QueryContext.hpp"
    +#include "query_execution/WorkOrderProtosContainer.hpp"
    +#include "query_execution/WorkOrdersContainer.hpp"
    +#include "relational_operators/WorkOrder.pb.h"
    +#include "storage/AggregationOperationState.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace quickstep {
    +
    +bool InitializeAggregationOperator::getAllWorkOrders(
    +    WorkOrdersContainer *container,
    +    QueryContext *query_context,
    +    StorageManager *storage_manager,
    +    const tmb::client_id scheduler_client_id,
    +    tmb::MessageBus *bus) {
    +  if (!started_) {
    +    AggregationOperationState *agg_state =
    +        query_context->getAggregationState(aggr_state_index_);
    +    DCHECK(agg_state != nullptr);
    +
    +    for (std::size_t part_id = 0;
    +         part_id < agg_state->getNumInitializationPartitions();
    +         ++part_id) {
    +      container->addNormalWorkOrder(
    +          new InitializeAggregationWorkOrder(query_id_,
    +                                             part_id,
    +                                             agg_state),
    +          op_index_);
    +    }
    +    started_ = true;
    +  }
    +  return true;
    +}
    +
    +// TODO(quickstep-team) : Think about how the number of partitions could be
    +// accessed in this function. Until then, we can't use partitioned aggregation
    +// initialization with the distributed version.
    --- End diff --
    
    FYI, as a follow-up for the distributed version, we could do [similar](https://github.com/apache/incubator-quickstep/blob/master/query_execution/QueryManagerDistributed.hpp#L169) for partitioned hash join in the distributed execution engine.
    
    In short, one partition of all aggregate work orders (including init, aggr, and finalize) is alway scheduled on the same `Shiftboss`, but multiple partition could also be scheduled on the same `Shiftboss`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99918342
  
    --- Diff: storage/AggregationOperationState.hpp ---
    @@ -156,6 +152,29 @@ class AggregationOperationState {
           const CatalogDatabaseLite &database);
     
       /**
    +   * @brief Get the number of partitions to be used for initializing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for initializing the aggregation.
    +   **/
    +  std::size_t getNumInitializationPartitions() const;
    +
    +  /**
    +   * @brief Get the number of partitions to be used for finalizing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for finalizing the aggregation.
    +   **/
    +  std::size_t getNumFinalizationPartitions() const;
    --- End diff --
    
    In the distributed scenario, you may need the table to be "really partitioned" (i.e. divided into several tables) in a different sense of what partitions refer to here. The partitions here cannot be scheduled to difference nodes since they have to update the same table.
    
    So later when we make `CollisionFreeVectorTable` support really partitioned input relations, we will can rename all the "partitions" here to some other name, say `segments`.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    For the question about `PartitionedHashTablePool` and `HashTablePool`. Note that their use patterns are different so perhaps it is not natural to merge them into one class.
    
    `PartitionedHashTablePool` creates **a fixed number** of hash tables **on its construction**. The use pattern is that every `AggregationWorkOrder` updates **all** of these hash tables and every `FinalizeAggregationWorkOrder` updates one of these hash tables.
    
    `HashTablePool` creates hash tables **on demand**. The current use pattern is that every `AggregationWorkOrder` checkouts **exclusively** one hash table, updates the hash table, and returns the hash table back to the pool. Then only one `FinalizeAggregationWorkOrder` is created to merge all the tables in the pool to the final hash table.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99504309
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    +    return false;
    +  }
    +
    +  // We need to know the exact min/max stats of the group-by key.
    +  // So it must be a CatalogAttribute (but not an expression).
    +  E::AttributeReferencePtr group_by_key_attr;
    +  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
    +  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
    +    return false;
    +  }
    +
    +  bool min_value_stat_is_exact;
    +  bool max_value_stat_is_exact;
    +  const TypedValue min_value =
    +      cost_model_for_aggregation_->findMinValueStat(
    +          aggregate, group_by_key_attr, &min_value_stat_is_exact);
    +  const TypedValue max_value =
    +      cost_model_for_aggregation_->findMaxValueStat(
    +          aggregate, group_by_key_attr, &max_value_stat_is_exact);
    +  if (min_value.isNull() || max_value.isNull() ||
    +      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
    +    return false;
    +  }
    +
    +  std::int64_t min_cpp_value;
    +  std::int64_t max_cpp_value;
    +  switch (group_by_key_attr->getValueType().getTypeID()) {
    +    case TypeID::kInt: {
    +      min_cpp_value = min_value.getLiteral<int>();
    +      max_cpp_value = max_value.getLiteral<int>();
    +      break;
    +    }
    +    case TypeID::kLong: {
    +      min_cpp_value = min_value.getLiteral<std::int64_t>();
    +      max_cpp_value = max_value.getLiteral<std::int64_t>();
    +      break;
    +    }
    +    default:
    +      return false;
    +  }
    +
    +  // TODO(jianqiao):
    +  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
    +  // 2. Reason about the table size bound (e.g. by checking memory size) instead
    +  //    of hardcoding it as a gflag.
    +  if (min_cpp_value < 0 ||
    +      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
    +      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
    +    return false;
    +  }
    +
    +  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
    +    const E::AggregateFunctionPtr agg_func =
    +        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
    +
    +    if (agg_func->is_distinct()) {
    +      return false;
    +    }
    +
    +    // TODO(jianqiao): Support AggregationID::AVG.
    +    switch (agg_func->getAggregate().getAggregationID()) {
    +      case AggregationID::kCount:  // Fall through
    +      case AggregationID::kSum:
    +        break;
    +      default:
    +        return false;
    +    }
    +
    +    const auto &arguments = agg_func->getArguments();
    +    if (arguments.size() > 1) {
    +      return false;
    +    }
    +
    +    if (arguments.size() == 1) {
    +      switch (arguments.front()->getValueType().getTypeID()) {
    --- End diff --
    
    Ditto for refactoring using `QUICKSTEP_EQUALS_ANY_CONSTANT`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99915265
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
    --- End diff --
    
    Currently we create the `InitializeAggregationOperator` only for collision-free aggregations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99901875
  
    --- Diff: storage/AggregationOperationState.hpp ---
    @@ -156,6 +152,29 @@ class AggregationOperationState {
           const CatalogDatabaseLite &database);
     
       /**
    +   * @brief Get the number of partitions to be used for initializing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for initializing the aggregation.
    +   **/
    +  std::size_t getNumInitializationPartitions() const;
    +
    +  /**
    +   * @brief Get the number of partitions to be used for finalizing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for finalizing the aggregation.
    +   **/
    +  std::size_t getNumFinalizationPartitions() const;
    --- End diff --
    
    Currently initialization is for `memset` the storage memory. It is much faster than finalization so we expect to have a smaller number of initialization partitions than finalization partitions if the storage memory is not large.
    
    E.g. suppose the vector table has 1 million entries of `int` keys and `std::int64_t` states -- amount to 12MB storage memory. Then we may want 80 partitions to finalize the table, but would not create 80 work orders to initialize the 12MB memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99897008
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    --- End diff --
    
    All the code in `PackedPayloadHashTable` is from the original `HashTable` implementation. We may have an overall code-style revision later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505104
  
    --- Diff: relational_operators/InitializeAggregationOperator.cpp ---
    @@ -0,0 +1,72 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "relational_operators/InitializeAggregationOperator.hpp"
    +
    +#include <cstddef>
    +
    +#include "query_execution/QueryContext.hpp"
    +#include "query_execution/WorkOrderProtosContainer.hpp"
    +#include "query_execution/WorkOrdersContainer.hpp"
    +#include "relational_operators/WorkOrder.pb.h"
    +#include "storage/AggregationOperationState.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace quickstep {
    +
    +bool InitializeAggregationOperator::getAllWorkOrders(
    +    WorkOrdersContainer *container,
    +    QueryContext *query_context,
    +    StorageManager *storage_manager,
    +    const tmb::client_id scheduler_client_id,
    +    tmb::MessageBus *bus) {
    +  if (!started_) {
    +    AggregationOperationState *agg_state =
    +        query_context->getAggregationState(aggr_state_index_);
    +    DCHECK(agg_state != nullptr);
    +
    +    for (std::size_t part_id = 0;
    --- End diff --
    
    Please replace `size_t` with `partition_id` defined in `catalog/CatalogTypedefs.hpp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99916086
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
         const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
         StorageManager *storage_manager)
         : input_relation_(input_relation),
    -      is_aggregate_partitioned_(checkAggregatePartitioned(
    -          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
    +      is_aggregate_collision_free_(false),
    +      is_aggregate_partitioned_(false),
           predicate_(predicate),
    -      group_by_list_(std::move(group_by)),
    -      arguments_(std::move(arguments)),
           is_distinct_(std::move(is_distinct)),
           storage_manager_(storage_manager) {
    +  if (!group_by.empty()) {
    +    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
    +      is_aggregate_collision_free_ = true;
    +    } else {
    +      is_aggregate_partitioned_ = checkAggregatePartitioned(
    +          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
    +    }
    +  }
    +
       // Sanity checks: each aggregate has a corresponding list of arguments.
    -  DCHECK(aggregate_functions.size() == arguments_.size());
    +  DCHECK(aggregate_functions.size() == arguments.size());
     
       // Get the types of GROUP BY expressions for creating HashTables below.
    -  std::vector<const Type *> group_by_types;
    -  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
    -    group_by_types.emplace_back(&group_by_element->getType());
    +  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    group_by_types_.emplace_back(&group_by_element->getType());
    +  }
    +
    +  // Prepare group-by key ids and non-trivial expressions.
    +  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    const attribute_id attr_id =
    +        group_by_element->getAttributeIdForValueAccessor();
    +    if (attr_id != kInvalidAttributeID) {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
    +    } else {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
    +                                     non_trivial_expressions_.size());
    +      non_trivial_expressions_.emplace_back(group_by_element.release());
    +    }
       }
     
       std::vector<AggregationHandle *> group_by_handles;
    -  group_by_handles.clear();
    -
    -  if (aggregate_functions.size() == 0) {
    -    // If there is no aggregation function, then it is a distinctify operation
    -    // on the group-by expressions.
    -    DCHECK_GT(group_by_list_.size(), 0u);
    -
    -    handles_.emplace_back(new AggregationHandleDistinct());
    -    arguments_.push_back({});
    -    is_distinct_.emplace_back(false);
    -    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                     hash_table_impl_type,
    -                                                     group_by_types,
    -                                                     {1},
    -                                                     handles_,
    -                                                     storage_manager));
    -  } else {
    -    // Set up each individual aggregate in this operation.
    -    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    -        aggregate_functions.begin();
    -    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
    -        args_it = arguments_.begin();
    -    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    -    std::vector<HashTableImplType>::const_iterator
    -        distinctify_hash_table_impl_types_it =
    -            distinctify_hash_table_impl_types.begin();
    -    std::vector<std::size_t> payload_sizes;
    -    for (; agg_func_it != aggregate_functions.end();
    -         ++agg_func_it, ++args_it, ++is_distinct_it) {
    -      // Get the Types of this aggregate's arguments so that we can create an
    -      // AggregationHandle.
    -      std::vector<const Type *> argument_types;
    -      for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -        argument_types.emplace_back(&argument->getType());
    -      }
     
    -      // Sanity checks: aggregate function exists and can apply to the specified
    -      // arguments.
    -      DCHECK(*agg_func_it != nullptr);
    -      DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
    -
    -      // Have the AggregateFunction create an AggregationHandle that we can use
    -      // to do actual aggregate computation.
    -      handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    -
    -      if (!group_by_list_.empty()) {
    -        // Aggregation with GROUP BY: combined payload is partially updated in
    -        // the presence of DISTINCT.
    -        if (*is_distinct_it) {
    -          handles_.back()->blockUpdate();
    -        }
    -        group_by_handles.emplace_back(handles_.back());
    -        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
    +  // Set up each individual aggregate in this operation.
    +  std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    +      aggregate_functions.begin();
    +  std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
    +      args_it = arguments.begin();
    +  std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    +  std::vector<HashTableImplType>::const_iterator
    +      distinctify_hash_table_impl_types_it =
    +          distinctify_hash_table_impl_types.begin();
    +  for (; agg_func_it != aggregate_functions.end();
    +       ++agg_func_it, ++args_it, ++is_distinct_it) {
    +    // Get the Types of this aggregate's arguments so that we can create an
    +    // AggregationHandle.
    +    std::vector<const Type *> argument_types;
    +    for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    +      argument_types.emplace_back(&argument->getType());
    +    }
    +
    +    // Prepare argument attribute ids and non-trivial expressions.
    +    std::vector<MultiSourceAttributeId> argument_ids;
    +    for (std::unique_ptr<const Scalar> &argument : *args_it) {
    +      const attribute_id attr_id =
    +          argument->getAttributeIdForValueAccessor();
    +      if (attr_id != kInvalidAttributeID) {
    +        argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
           } else {
    -        // Aggregation without GROUP BY: create a single global state.
    -        single_states_.emplace_back(handles_.back()->createInitialState());
    -
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -        // See if all of this aggregate's arguments are attributes in the input
    -        // relation. If so, remember the attribute IDs so that we can do copy
    -        // elision when actually performing the aggregation.
    -        std::vector<attribute_id> local_arguments_as_attributes;
    -        local_arguments_as_attributes.reserve(args_it->size());
    -        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -          const attribute_id argument_id =
    -              argument->getAttributeIdForValueAccessor();
    -          if (argument_id == -1) {
    -            local_arguments_as_attributes.clear();
    -            break;
    -          } else {
    -            DCHECK_EQ(input_relation_.getID(),
    -                      argument->getRelationIdForValueAccessor());
    -            local_arguments_as_attributes.push_back(argument_id);
    -          }
    -        }
    -
    -        arguments_as_attributes_.emplace_back(
    -            std::move(local_arguments_as_attributes));
    -#endif
    +        argument_ids.emplace_back(ValueAccessorSource::kDerived,
    +                                  non_trivial_expressions_.size());
    +        non_trivial_expressions_.emplace_back(argument.release());
           }
    +    }
    +    argument_ids_.emplace_back(std::move(argument_ids));
    +
    +    // Sanity checks: aggregate function exists and can apply to the specified
    +    // arguments.
    +    DCHECK(*agg_func_it != nullptr);
    +    DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
     
    -      // Initialize the corresponding distinctify hash table if this is a
    -      // DISTINCT aggregation.
    +    // Have the AggregateFunction create an AggregationHandle that we can use
    +    // to do actual aggregate computation.
    +    handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    +
    +    if (!group_by_key_ids_.empty()) {
    +      // Aggregation with GROUP BY: combined payload is partially updated in
    +      // the presence of DISTINCT.
           if (*is_distinct_it) {
    -        std::vector<const Type *> key_types(group_by_types);
    -        key_types.insert(
    -            key_types.end(), argument_types.begin(), argument_types.end());
    -        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    -        // estimating the number of entries in the distinctify hash table.
    -        // We may estimate for each distinct aggregation an
    -        // estimated_num_distinct_keys value during query optimization, if it's
    -        // worth.
    -        distinctify_hashtables_.emplace_back(
    -            AggregationStateFastHashTableFactory::CreateResizable(
    -                *distinctify_hash_table_impl_types_it,
    -                key_types,
    -                estimated_num_entries,
    -                {0},
    -                {},
    -                storage_manager));
    -        ++distinctify_hash_table_impl_types_it;
    -      } else {
    -        distinctify_hashtables_.emplace_back(nullptr);
    +        handles_.back()->blockUpdate();
           }
    +      group_by_handles.emplace_back(handles_.back().get());
    +    } else {
    +      // Aggregation without GROUP BY: create a single global state.
    +      single_states_.emplace_back(handles_.back()->createInitialState());
         }
     
    -    if (!group_by_handles.empty()) {
    -      // Aggregation with GROUP BY: create a HashTable pool.
    -      if (!is_aggregate_partitioned_) {
    -        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                         hash_table_impl_type,
    -                                                         group_by_types,
    -                                                         payload_sizes,
    -                                                         group_by_handles,
    -                                                         storage_manager));
    -      } else {
    -        partitioned_group_by_hashtable_pool_.reset(
    -            new PartitionedHashTablePool(estimated_num_entries,
    -                                         FLAGS_num_aggregation_partitions,
    -                                         hash_table_impl_type,
    -                                         group_by_types,
    -                                         payload_sizes,
    -                                         group_by_handles,
    -                                         storage_manager));
    -      }
    +    // Initialize the corresponding distinctify hash table if this is a
    +    // DISTINCT aggregation.
    +    if (*is_distinct_it) {
    +      std::vector<const Type *> key_types(group_by_types_);
    +      key_types.insert(
    +          key_types.end(), argument_types.begin(), argument_types.end());
    +      // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    +      // estimating the number of entries in the distinctify hash table.
    +      // We need to estimate for each distinct aggregation an
    +      // estimated_num_distinct_keys value during query optimization.
    +      distinctify_hashtables_.emplace_back(
    +          AggregationStateHashTableFactory::CreateResizable(
    +              *distinctify_hash_table_impl_types_it,
    +              key_types,
    +              estimated_num_entries,
    +              {},
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99516978
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    --- End diff --
    
    Refactor using `CHECK`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99517280
  
    --- Diff: storage/PackedPayloadHashTable.cpp ---
    @@ -0,0 +1,463 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "storage/PackedPayloadHashTable.hpp"
    +
    +#include <algorithm>
    +#include <cstddef>
    +#include <cstdint>
    +#include <cstdlib>
    +#include <vector>
    +
    +#include "expressions/aggregation/AggregationHandle.hpp"
    +#include "storage/HashTableKeyManager.hpp"
    +#include "storage/StorageBlob.hpp"
    +#include "storage/StorageBlockInfo.hpp"
    +#include "storage/StorageConstants.hpp"
    +#include "storage/StorageManager.hpp"
    +#include "storage/ValueAccessor.hpp"
    +#include "storage/ValueAccessorMultiplexer.hpp"
    +#include "threading/SpinMutex.hpp"
    +#include "threading/SpinSharedMutex.hpp"
    +#include "types/Type.hpp"
    +#include "types/containers/ColumnVectorsValueAccessor.hpp"
    +#include "utility/Alignment.hpp"
    +#include "utility/Macros.hpp"
    +#include "utility/PrimeNumber.hpp"
    +
    +#include "glog/logging.h"
    +
    +namespace quickstep {
    +
    +PackedPayloadHashTable::PackedPayloadHashTable(
    +    const std::vector<const Type *> &key_types,
    +    const std::size_t num_entries,
    +    const std::vector<AggregationHandle *> &handles,
    +    StorageManager *storage_manager)
    +    : key_types_(key_types),
    +      num_handles_(handles.size()),
    +      handles_(handles),
    +      total_payload_size_(ComputeTotalPayloadSize(handles)),
    +      storage_manager_(storage_manager),
    +      kBucketAlignment(alignof(std::atomic<std::size_t>)),
    +      kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
    +      key_manager_(key_types_, kValueOffset + total_payload_size_),
    +      bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
    +  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
    +  for (const auto *handle : handles) {
    +    payload_offsets_.emplace_back(payload_offset_running_sum);
    +    payload_offset_running_sum += handle->getPayloadSize();
    +  }
    +
    +  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
    +  // init_payload to buckets if payload contains out of line data.
    +  init_payload_ =
    +      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
    +  DCHECK(init_payload_ != nullptr);
    +
    +  for (std::size_t i = 0; i < num_handles_; ++i) {
    +    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
    +  }
    +
    +  // Bucket size always rounds up to the alignment requirement of the atomic
    +  // size_t "next" pointer at the front or a ValueT, whichever is larger.
    +  //
    +  // Give base HashTable information about what key components are stored
    +  // inline from 'key_manager_'.
    +  setKeyInline(key_manager_.getKeyInline());
    +
    +  // Pick out a prime number of slots and calculate storage requirements.
    +  std::size_t num_slots_tmp =
    +      get_next_prime_number(num_entries * kHashTableLoadFactor);
    +  std::size_t required_memory =
    +      sizeof(Header) + num_slots_tmp * sizeof(std::atomic<std::size_t>) +
    +      (num_slots_tmp / kHashTableLoadFactor) *
    +          (bucket_size_ + key_manager_.getEstimatedVariableKeySize());
    +  std::size_t num_storage_slots =
    +      this->storage_manager_->SlotsNeededForBytes(required_memory);
    +  if (num_storage_slots == 0) {
    +    FATAL_ERROR(
    +        "Storage requirement for SeparateChainingHashTable "
    +        "exceeds maximum allocation size.");
    +  }
    +
    +  // Get a StorageBlob to hold the hash table.
    +  const block_id blob_id =
    +      this->storage_manager_->createBlob(num_storage_slots);
    +  this->blob_ = this->storage_manager_->getBlobMutable(blob_id);
    +
    +  void *aligned_memory_start = this->blob_->getMemoryMutable();
    +  std::size_t available_memory = num_storage_slots * kSlotSizeBytes;
    +  if (align(alignof(Header),
    +            sizeof(Header),
    +            aligned_memory_start,
    +            available_memory) == nullptr) {
    +    // With current values from StorageConstants.hpp, this should be
    +    // impossible. A blob is at least 1 MB, while a Header has alignment
    +    // requirement of just kCacheLineBytes (64 bytes).
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet alignment "
    +        "requirements of SeparateChainingHashTable::Header.");
    +  } else if (aligned_memory_start != this->blob_->getMemoryMutable()) {
    +    // This should also be impossible, since the StorageManager allocates slots
    +    // aligned to kCacheLineBytes.
    +    DEV_WARNING("StorageBlob memory adjusted by "
    +                << (num_storage_slots * kSlotSizeBytes - available_memory)
    +                << " bytes to meet alignment requirement for "
    +                << "SeparateChainingHashTable::Header.");
    +  }
    +
    +  // Locate the header.
    +  header_ = static_cast<Header *>(aligned_memory_start);
    +  aligned_memory_start =
    +      static_cast<char *>(aligned_memory_start) + sizeof(Header);
    +  available_memory -= sizeof(Header);
    +
    +  // Recompute the number of slots & buckets using the actual available memory.
    +  // Most likely, we got some extra free bucket space due to "rounding up" to
    +  // the storage blob's size. It's also possible (though very unlikely) that we
    +  // will wind up with fewer buckets than we initially wanted because of screwy
    +  // alignment requirements for ValueT.
    +  std::size_t num_buckets_tmp =
    +      available_memory /
    +      (kHashTableLoadFactor * sizeof(std::atomic<std::size_t>) + bucket_size_ +
    +       key_manager_.getEstimatedVariableKeySize());
    +  num_slots_tmp =
    +      get_previous_prime_number(num_buckets_tmp * kHashTableLoadFactor);
    +  num_buckets_tmp = num_slots_tmp / kHashTableLoadFactor;
    +  DEBUG_ASSERT(num_slots_tmp > 0);
    +  DEBUG_ASSERT(num_buckets_tmp > 0);
    +
    +  // Locate the slot array.
    +  slots_ = static_cast<std::atomic<std::size_t> *>(aligned_memory_start);
    +  aligned_memory_start = static_cast<char *>(aligned_memory_start) +
    +                         sizeof(std::atomic<std::size_t>) * num_slots_tmp;
    +  available_memory -= sizeof(std::atomic<std::size_t>) * num_slots_tmp;
    +
    +  // Locate the buckets.
    +  buckets_ = aligned_memory_start;
    +  // Extra-paranoid: If ValueT has an alignment requirement greater than that
    +  // of std::atomic<std::size_t>, we may need to adjust the start of the bucket
    +  // array.
    +  if (align(kBucketAlignment, bucket_size_, buckets_, available_memory) ==
    +      nullptr) {
    +    FATAL_ERROR(
    +        "StorageBlob used to hold resizable "
    +        "SeparateChainingHashTable is too small to meet "
    +        "alignment requirements of buckets.");
    +  } else if (buckets_ != aligned_memory_start) {
    +    DEV_WARNING(
    +        "Bucket array start position adjusted to meet alignment "
    +        "requirement for SeparateChainingHashTable's value type.");
    +    if (num_buckets_tmp * bucket_size_ > available_memory) {
    +      --num_buckets_tmp;
    +    }
    +  }
    +
    +  // Fill in the header.
    +  header_->num_slots = num_slots_tmp;
    +  header_->num_buckets = num_buckets_tmp;
    +  header_->buckets_allocated.store(0, std::memory_order_relaxed);
    +  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
    +  available_memory -= bucket_size_ * (header_->num_buckets);
    +
    +  // Locate variable-length key storage region, and give it all the remaining
    +  // bytes in the blob.
    +  key_manager_.setVariableLengthStorageInfo(
    +      static_cast<char *>(buckets_) + header_->num_buckets * bucket_size_,
    +      available_memory,
    +      &(header_->variable_length_bytes_allocated));
    +}
    +
    +PackedPayloadHashTable::~PackedPayloadHashTable() {
    +  if (blob_.valid()) {
    +    const block_id blob_id = blob_->getID();
    +    blob_.release();
    +    storage_manager_->deleteBlockOrBlobFile(blob_id);
    +  }
    +  std::free(init_payload_);
    +}
    +
    +void PackedPayloadHashTable::clear() {
    +  const std::size_t used_buckets =
    +      header_->buckets_allocated.load(std::memory_order_relaxed);
    +  // Destroy existing values, if necessary.
    +  destroyPayload();
    +
    +  // Zero-out slot array.
    +  std::memset(
    +      slots_, 0x0, sizeof(std::atomic<std::size_t>) * header_->num_slots);
    +
    +  // Zero-out used buckets.
    +  std::memset(buckets_, 0x0, used_buckets * bucket_size_);
    +
    +  header_->buckets_allocated.store(0, std::memory_order_relaxed);
    +  header_->variable_length_bytes_allocated.store(0, std::memory_order_relaxed);
    +  key_manager_.zeroNextVariableLengthKeyOffset();
    +}
    +
    +void PackedPayloadHashTable::destroyPayload() {
    +  const std::size_t num_buckets =
    +      header_->buckets_allocated.load(std::memory_order_relaxed);
    +  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
    +  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
    +    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
    +      void *value_internal_ptr =
    +          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
    +      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
    +    }
    +    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
    +  }
    +}
    +
    +bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
    +    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
    +    const std::vector<MultiSourceAttributeId> &key_attr_ids,
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +  ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
    +
    +  base_accessor->beginIterationVirtual();
    +  if (derived_accessor == nullptr) {
    +    return upsertValueAccessorCompositeKeyInternal<false>(
    +        argument_ids,
    +        key_attr_ids,
    +        base_accessor,
    +        nullptr);
    +  } else {
    +    DCHECK(derived_accessor->getImplementationType()
    +               == ValueAccessor::Implementation::kColumnVectors);
    +    derived_accessor->beginIterationVirtual();
    +    return upsertValueAccessorCompositeKeyInternal<true>(
    +        argument_ids,
    +        key_attr_ids,
    +        base_accessor,
    +        static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
    +  }
    +}
    +
    +void PackedPayloadHashTable::resize(const std::size_t extra_buckets,
    +                                    const std::size_t extra_variable_storage,
    +                                    const std::size_t retry_num) {
    +  // A retry should never be necessary with this implementation of HashTable.
    +  // Separate chaining ensures that any resized hash table with more buckets
    +  // than the original table will be able to hold more entries than the
    +  // original.
    +  DEBUG_ASSERT(retry_num == 0);
    --- End diff --
    
    Use `DCHECK_EQ` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99505252
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -80,148 +83,145 @@ AggregationOperationState::AggregationOperationState(
         const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
         StorageManager *storage_manager)
         : input_relation_(input_relation),
    -      is_aggregate_partitioned_(checkAggregatePartitioned(
    -          estimated_num_entries, is_distinct, group_by, aggregate_functions)),
    +      is_aggregate_collision_free_(false),
    +      is_aggregate_partitioned_(false),
           predicate_(predicate),
    -      group_by_list_(std::move(group_by)),
    -      arguments_(std::move(arguments)),
           is_distinct_(std::move(is_distinct)),
           storage_manager_(storage_manager) {
    +  if (!group_by.empty()) {
    +    if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
    +      is_aggregate_collision_free_ = true;
    +    } else {
    +      is_aggregate_partitioned_ = checkAggregatePartitioned(
    +          estimated_num_entries, is_distinct_, group_by, aggregate_functions);
    +    }
    +  }
    +
       // Sanity checks: each aggregate has a corresponding list of arguments.
    -  DCHECK(aggregate_functions.size() == arguments_.size());
    +  DCHECK(aggregate_functions.size() == arguments.size());
     
       // Get the types of GROUP BY expressions for creating HashTables below.
    -  std::vector<const Type *> group_by_types;
    -  for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
    -    group_by_types.emplace_back(&group_by_element->getType());
    +  for (const std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    group_by_types_.emplace_back(&group_by_element->getType());
    +  }
    +
    +  // Prepare group-by key ids and non-trivial expressions.
    +  for (std::unique_ptr<const Scalar> &group_by_element : group_by) {
    +    const attribute_id attr_id =
    +        group_by_element->getAttributeIdForValueAccessor();
    +    if (attr_id != kInvalidAttributeID) {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kBase, attr_id);
    +    } else {
    +      group_by_key_ids_.emplace_back(ValueAccessorSource::kDerived,
    +                                     non_trivial_expressions_.size());
    +      non_trivial_expressions_.emplace_back(group_by_element.release());
    +    }
       }
     
       std::vector<AggregationHandle *> group_by_handles;
    -  group_by_handles.clear();
    -
    -  if (aggregate_functions.size() == 0) {
    -    // If there is no aggregation function, then it is a distinctify operation
    -    // on the group-by expressions.
    -    DCHECK_GT(group_by_list_.size(), 0u);
    -
    -    handles_.emplace_back(new AggregationHandleDistinct());
    -    arguments_.push_back({});
    -    is_distinct_.emplace_back(false);
    -    group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                     hash_table_impl_type,
    -                                                     group_by_types,
    -                                                     {1},
    -                                                     handles_,
    -                                                     storage_manager));
    -  } else {
    -    // Set up each individual aggregate in this operation.
    -    std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    -        aggregate_functions.begin();
    -    std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
    -        args_it = arguments_.begin();
    -    std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    -    std::vector<HashTableImplType>::const_iterator
    -        distinctify_hash_table_impl_types_it =
    -            distinctify_hash_table_impl_types.begin();
    -    std::vector<std::size_t> payload_sizes;
    -    for (; agg_func_it != aggregate_functions.end();
    -         ++agg_func_it, ++args_it, ++is_distinct_it) {
    -      // Get the Types of this aggregate's arguments so that we can create an
    -      // AggregationHandle.
    -      std::vector<const Type *> argument_types;
    -      for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -        argument_types.emplace_back(&argument->getType());
    -      }
     
    -      // Sanity checks: aggregate function exists and can apply to the specified
    -      // arguments.
    -      DCHECK(*agg_func_it != nullptr);
    -      DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
    -
    -      // Have the AggregateFunction create an AggregationHandle that we can use
    -      // to do actual aggregate computation.
    -      handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    -
    -      if (!group_by_list_.empty()) {
    -        // Aggregation with GROUP BY: combined payload is partially updated in
    -        // the presence of DISTINCT.
    -        if (*is_distinct_it) {
    -          handles_.back()->blockUpdate();
    -        }
    -        group_by_handles.emplace_back(handles_.back());
    -        payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
    +  // Set up each individual aggregate in this operation.
    +  std::vector<const AggregateFunction *>::const_iterator agg_func_it =
    +      aggregate_functions.begin();
    +  std::vector<std::vector<std::unique_ptr<const Scalar>>>::iterator
    +      args_it = arguments.begin();
    +  std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
    +  std::vector<HashTableImplType>::const_iterator
    +      distinctify_hash_table_impl_types_it =
    +          distinctify_hash_table_impl_types.begin();
    +  for (; agg_func_it != aggregate_functions.end();
    +       ++agg_func_it, ++args_it, ++is_distinct_it) {
    +    // Get the Types of this aggregate's arguments so that we can create an
    +    // AggregationHandle.
    +    std::vector<const Type *> argument_types;
    +    for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    +      argument_types.emplace_back(&argument->getType());
    +    }
    +
    +    // Prepare argument attribute ids and non-trivial expressions.
    +    std::vector<MultiSourceAttributeId> argument_ids;
    +    for (std::unique_ptr<const Scalar> &argument : *args_it) {
    +      const attribute_id attr_id =
    +          argument->getAttributeIdForValueAccessor();
    +      if (attr_id != kInvalidAttributeID) {
    +        argument_ids.emplace_back(ValueAccessorSource::kBase, attr_id);
           } else {
    -        // Aggregation without GROUP BY: create a single global state.
    -        single_states_.emplace_back(handles_.back()->createInitialState());
    -
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -        // See if all of this aggregate's arguments are attributes in the input
    -        // relation. If so, remember the attribute IDs so that we can do copy
    -        // elision when actually performing the aggregation.
    -        std::vector<attribute_id> local_arguments_as_attributes;
    -        local_arguments_as_attributes.reserve(args_it->size());
    -        for (const std::unique_ptr<const Scalar> &argument : *args_it) {
    -          const attribute_id argument_id =
    -              argument->getAttributeIdForValueAccessor();
    -          if (argument_id == -1) {
    -            local_arguments_as_attributes.clear();
    -            break;
    -          } else {
    -            DCHECK_EQ(input_relation_.getID(),
    -                      argument->getRelationIdForValueAccessor());
    -            local_arguments_as_attributes.push_back(argument_id);
    -          }
    -        }
    -
    -        arguments_as_attributes_.emplace_back(
    -            std::move(local_arguments_as_attributes));
    -#endif
    +        argument_ids.emplace_back(ValueAccessorSource::kDerived,
    +                                  non_trivial_expressions_.size());
    +        non_trivial_expressions_.emplace_back(argument.release());
           }
    +    }
    +    argument_ids_.emplace_back(std::move(argument_ids));
    +
    +    // Sanity checks: aggregate function exists and can apply to the specified
    +    // arguments.
    +    DCHECK(*agg_func_it != nullptr);
    +    DCHECK((*agg_func_it)->canApplyToTypes(argument_types));
     
    -      // Initialize the corresponding distinctify hash table if this is a
    -      // DISTINCT aggregation.
    +    // Have the AggregateFunction create an AggregationHandle that we can use
    +    // to do actual aggregate computation.
    +    handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
    +
    +    if (!group_by_key_ids_.empty()) {
    +      // Aggregation with GROUP BY: combined payload is partially updated in
    +      // the presence of DISTINCT.
           if (*is_distinct_it) {
    -        std::vector<const Type *> key_types(group_by_types);
    -        key_types.insert(
    -            key_types.end(), argument_types.begin(), argument_types.end());
    -        // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    -        // estimating the number of entries in the distinctify hash table.
    -        // We may estimate for each distinct aggregation an
    -        // estimated_num_distinct_keys value during query optimization, if it's
    -        // worth.
    -        distinctify_hashtables_.emplace_back(
    -            AggregationStateFastHashTableFactory::CreateResizable(
    -                *distinctify_hash_table_impl_types_it,
    -                key_types,
    -                estimated_num_entries,
    -                {0},
    -                {},
    -                storage_manager));
    -        ++distinctify_hash_table_impl_types_it;
    -      } else {
    -        distinctify_hashtables_.emplace_back(nullptr);
    +        handles_.back()->blockUpdate();
           }
    +      group_by_handles.emplace_back(handles_.back().get());
    +    } else {
    +      // Aggregation without GROUP BY: create a single global state.
    +      single_states_.emplace_back(handles_.back()->createInitialState());
         }
     
    -    if (!group_by_handles.empty()) {
    -      // Aggregation with GROUP BY: create a HashTable pool.
    -      if (!is_aggregate_partitioned_) {
    -        group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
    -                                                         hash_table_impl_type,
    -                                                         group_by_types,
    -                                                         payload_sizes,
    -                                                         group_by_handles,
    -                                                         storage_manager));
    -      } else {
    -        partitioned_group_by_hashtable_pool_.reset(
    -            new PartitionedHashTablePool(estimated_num_entries,
    -                                         FLAGS_num_aggregation_partitions,
    -                                         hash_table_impl_type,
    -                                         group_by_types,
    -                                         payload_sizes,
    -                                         group_by_handles,
    -                                         storage_manager));
    -      }
    +    // Initialize the corresponding distinctify hash table if this is a
    +    // DISTINCT aggregation.
    +    if (*is_distinct_it) {
    +      std::vector<const Type *> key_types(group_by_types_);
    +      key_types.insert(
    +          key_types.end(), argument_types.begin(), argument_types.end());
    +      // TODO(jianqiao): estimated_num_entries is quite inaccurate for
    +      // estimating the number of entries in the distinctify hash table.
    +      // We need to estimate for each distinct aggregation an
    +      // estimated_num_distinct_keys value during query optimization.
    +      distinctify_hashtables_.emplace_back(
    +          AggregationStateHashTableFactory::CreateResizable(
    +              *distinctify_hash_table_impl_types_it,
    +              key_types,
    +              estimated_num_entries,
    +              {},
    --- End diff --
    
    Please add comments regarding this parameter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99867096
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    --- End diff --
    
    Yes we will have a followup PR to improve TPC-H Q01.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep issue #179: QUICKSTEP-70-71 Improve aggregation performa...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on the issue:

    https://github.com/apache/incubator-quickstep/pull/179
  
    Just rebased on master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99511739
  
    --- Diff: storage/AggregationOperationState.hpp ---
    @@ -156,6 +152,29 @@ class AggregationOperationState {
           const CatalogDatabaseLite &database);
     
       /**
    +   * @brief Get the number of partitions to be used for initializing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for initializing the aggregation.
    +   **/
    +  std::size_t getNumInitializationPartitions() const;
    +
    +  /**
    +   * @brief Get the number of partitions to be used for finalizing the
    +   *        aggregation.
    +   *
    +   * @return The number of partitions to be used for finalizing the aggregation.
    +   **/
    +  std::size_t getNumFinalizationPartitions() const;
    --- End diff --
    
    I was wondering why `getNumInitializationPartitions` may be different than `getNumFinalizationPartitions`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99500588
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -1495,9 +1607,28 @@ void ExecutionGenerator::convertAggregate(
       }
     
       if (!group_by_types.empty()) {
    -    // Right now, only SeparateChaining is supported.
    -    aggr_state_proto->set_hash_table_impl_type(
    -        serialization::HashTableImplType::SEPARATE_CHAINING);
    +    const std::size_t estimated_num_groups =
    +        cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
    +
    +    std::size_t max_num_groups;
    +    const bool can_use_collision_free_aggregation =
    +        canUseCollisionFreeAggregation(physical_plan,
    +                                       estimated_num_groups,
    +                                       &max_num_groups);
    +
    +    if (can_use_collision_free_aggregation) {
    --- End diff --
    
    Minor, we actually don't need this extra bool variable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99882633
  
    --- Diff: query_optimizer/ExecutionGenerator.cpp ---
    @@ -371,6 +378,109 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
       }
     }
     
    +bool ExecutionGenerator::canUseCollisionFreeAggregation(
    +    const P::AggregatePtr &aggregate,
    +    const std::size_t estimated_num_groups,
    +    std::size_t *max_num_groups) const {
    +#ifdef QUICKSTEP_DISTRIBUTED
    +  // Currently we cannot do this fast path with the distributed setting. See
    +  // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
    +  // FinalizeAggregationOperator::getAllWorkOrderProtos().
    +  return false;
    +#endif
    +
    +  // Supports only single group-by key.
    +  if (aggregate->grouping_expressions().size() != 1) {
    +    return false;
    +  }
    +
    +  // We need to know the exact min/max stats of the group-by key.
    +  // So it must be a CatalogAttribute (but not an expression).
    +  E::AttributeReferencePtr group_by_key_attr;
    +  const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
    +  if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
    +    return false;
    +  }
    +
    +  bool min_value_stat_is_exact;
    +  bool max_value_stat_is_exact;
    +  const TypedValue min_value =
    +      cost_model_for_aggregation_->findMinValueStat(
    +          aggregate, group_by_key_attr, &min_value_stat_is_exact);
    +  const TypedValue max_value =
    +      cost_model_for_aggregation_->findMaxValueStat(
    +          aggregate, group_by_key_attr, &max_value_stat_is_exact);
    +  if (min_value.isNull() || max_value.isNull() ||
    +      (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
    +    return false;
    +  }
    +
    +  std::int64_t min_cpp_value;
    +  std::int64_t max_cpp_value;
    +  switch (group_by_key_attr->getValueType().getTypeID()) {
    +    case TypeID::kInt: {
    +      min_cpp_value = min_value.getLiteral<int>();
    +      max_cpp_value = max_value.getLiteral<int>();
    +      break;
    +    }
    +    case TypeID::kLong: {
    +      min_cpp_value = min_value.getLiteral<std::int64_t>();
    +      max_cpp_value = max_value.getLiteral<std::int64_t>();
    +      break;
    +    }
    +    default:
    +      return false;
    +  }
    +
    +  // TODO(jianqiao):
    +  // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
    +  // 2. Reason about the table size bound (e.g. by checking memory size) instead
    +  //    of hardcoding it as a gflag.
    +  if (min_cpp_value < 0 ||
    +      max_cpp_value >= FLAGS_collision_free_vector_table_max_size ||
    +      max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
    +    return false;
    +  }
    +
    +  for (const auto &agg_expr : aggregate->aggregate_expressions()) {
    +    const E::AggregateFunctionPtr agg_func =
    +        std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
    +
    +    if (agg_func->is_distinct()) {
    +      return false;
    +    }
    +
    +    // TODO(jianqiao): Support AggregationID::AVG.
    +    switch (agg_func->getAggregate().getAggregationID()) {
    +      case AggregationID::kCount:  // Fall through
    +      case AggregationID::kSum:
    +        break;
    +      default:
    +        return false;
    +    }
    +
    +    const auto &arguments = agg_func->getArguments();
    +    if (arguments.size() > 1) {
    +      return false;
    +    }
    +
    +    if (arguments.size() == 1) {
    +      switch (arguments.front()->getValueType().getTypeID()) {
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99929968
  
    --- Diff: relational_operators/WorkOrderFactory.cpp ---
    @@ -186,6 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index;
           return new FinalizeAggregationWorkOrder(
               proto.query_id(),
    +          0uL,
    --- End diff --
    
    Added a TODO comment:
    
    ```
    // TODO(quickstep-team): Handle inner-table partitioning in the distributed setting.
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99867160
  
    --- Diff: expressions/aggregation/AggregationConcreteHandle.hpp ---
    @@ -61,7 +61,7 @@ class HashTableStateUpserterFast {
        *        table. The corresponding state (for the same key) in the destination
        *        hash table will be upserted.
        **/
    -  HashTableStateUpserterFast(const HandleT &handle,
    +  HashTableStateUpserter(const HandleT &handle,
                                  const std::uint8_t *source_state)
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by zuyu <gi...@git.apache.org>.
Github user zuyu commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99499809
  
    --- Diff: expressions/aggregation/AggregationConcreteHandle.hpp ---
    @@ -61,7 +61,7 @@ class HashTableStateUpserterFast {
        *        table. The corresponding state (for the same key) in the destination
        *        hash table will be upserted.
        **/
    -  HashTableStateUpserterFast(const HandleT &handle,
    +  HashTableStateUpserter(const HandleT &handle,
                                  const std::uint8_t *source_state)
    --- End diff --
    
    Align with the line above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99928307
  
    --- Diff: storage/AggregationOperationState.cpp ---
    @@ -353,187 +353,286 @@ bool AggregationOperationState::ProtoIsValid(
       return true;
     }
     
    -void AggregationOperationState::aggregateBlock(const block_id input_block,
    -                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  if (group_by_list_.empty()) {
    -    aggregateBlockSingleState(input_block);
    -  } else {
    -    aggregateBlockHashTable(input_block, lip_filter_adaptive_prober);
    +bool AggregationOperationState::checkAggregatePartitioned(
    +    const std::size_t estimated_num_groups,
    +    const std::vector<bool> &is_distinct,
    +    const std::vector<std::unique_ptr<const Scalar>> &group_by,
    +    const std::vector<const AggregateFunction *> &aggregate_functions) const {
    +  // If there's no aggregation, return false.
    +  if (aggregate_functions.empty()) {
    +    return false;
    +  }
    +  // Check if there's a distinct operation involved in any aggregate, if so
    +  // the aggregate can't be partitioned.
    +  for (auto distinct : is_distinct) {
    +    if (distinct) {
    +      return false;
    +    }
    +  }
    +  // There's no distinct aggregation involved, Check if there's at least one
    +  // GROUP BY operation.
    +  if (group_by.empty()) {
    +    return false;
    +  }
    +
    +  // Currently we require that all the group-by keys are ScalarAttributes for
    +  // the convenient of implementing copy elision.
    +  // TODO(jianqiao): relax this requirement.
    +  for (const auto &group_by_element : group_by) {
    +    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
    +      return false;
    +    }
       }
    +
    +  // There are GROUP BYs without DISTINCT. Check if the estimated number of
    +  // groups is large enough to warrant a partitioned aggregation.
    +  return estimated_num_groups >
    +         static_cast<std::size_t>(
    +             FLAGS_partition_aggregation_num_groups_threshold);
    +  return false;
     }
     
    -void AggregationOperationState::finalizeAggregate(
    -    InsertDestination *output_destination) {
    -  if (group_by_list_.empty()) {
    -    finalizeSingleState(output_destination);
    +std::size_t AggregationOperationState::getNumInitializationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumInitializationPartitions();
       } else {
    -    finalizeHashTable(output_destination);
    +    return 0u;
       }
     }
     
    -void AggregationOperationState::mergeSingleState(
    -    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    -  DEBUG_ASSERT(local_state.size() == single_states_.size());
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    if (!is_distinct_[agg_idx]) {
    -      handles_[agg_idx]->mergeStates(*local_state[agg_idx],
    -                                     single_states_[agg_idx].get());
    -    }
    +std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
    +  if (is_aggregate_collision_free_) {
    +    return static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->getNumFinalizationPartitions();
    +  } else if (is_aggregate_partitioned_) {
    +    return partitioned_group_by_hashtable_pool_->getNumPartitions();
    +  } else  {
    +    return 1u;
       }
     }
     
    -void AggregationOperationState::aggregateBlockSingleState(
    -    const block_id input_block) {
    -  // Aggregate per-block state for each aggregate.
    -  std::vector<std::unique_ptr<AggregationState>> local_state;
    +void AggregationOperationState::initialize(const std::size_t partition_id) {
    +  if (is_aggregate_collision_free_) {
    +    static_cast<CollisionFreeVectorTable *>(
    +        collision_free_hashtable_.get())->initialize(partition_id);
    +  } else {
    +    LOG(FATAL) << "AggregationOperationState::initializeState() "
    +               << "is not supported by this aggregation";
    +  }
    +}
     
    +void AggregationOperationState::aggregateBlock(const block_id input_block,
    +                                               LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
       BlockReference block(
           storage_manager_->getBlock(input_block, input_relation_));
    +  const auto &tuple_store = block->getTupleStorageSubBlock();
    +  std::unique_ptr<ValueAccessor> base_accessor(tuple_store.createValueAccessor());
    +  std::unique_ptr<ValueAccessor> shared_accessor;
    +  ValueAccessor *accessor = base_accessor.get();
     
    +  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
    +  // as the existence map for the tuples.
       std::unique_ptr<TupleIdSequence> matches;
       if (predicate_ != nullptr) {
    -    std::unique_ptr<ValueAccessor> accessor(
    -        block->getTupleStorageSubBlock().createValueAccessor());
    -    matches.reset(block->getMatchesForPredicate(predicate_.get(), matches.get()));
    +    matches.reset(block->getMatchesForPredicate(predicate_.get()));
    +    shared_accessor.reset(
    +        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
    +    accessor = shared_accessor.get();
    +  }
    +  if (lip_filter_adaptive_prober != nullptr) {
    +    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor));
    +    shared_accessor.reset(
    +        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
    +    accessor = shared_accessor.get();
       }
     
    -  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    -    const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
    -#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
    -    // If all arguments are attributes of the input relation, elide a copy.
    -    if (!arguments_as_attributes_[agg_idx].empty()) {
    -      local_arguments_as_attributes = &(arguments_as_attributes_[agg_idx]);
    +  std::unique_ptr<ColumnVectorsValueAccessor> non_trivial_results;
    +  if (!non_trivial_expressions_.empty()) {
    +    non_trivial_results.reset(new ColumnVectorsValueAccessor());
    +    SubBlocksReference sub_blocks_ref(tuple_store,
    +                                      block->getIndices(),
    +                                      block->getIndicesConsistent());
    +    for (const auto &expression : non_trivial_expressions_) {
    +      non_trivial_results->addColumn(
    +          expression->getAllValues(accessor, &sub_blocks_ref));
         }
    -#endif
    +  }
    +
    +  accessor->beginIterationVirtual();
    +
    +  ValueAccessorMultiplexer accessor_mux(accessor, non_trivial_results.get());
    +  if (group_by_key_ids_.empty()) {
    +    aggregateBlockSingleState(accessor_mux);
    +  } else {
    +    aggregateBlockHashTable(accessor_mux);
    +  }
    +}
    +
    +void AggregationOperationState::aggregateBlockSingleState(
    +    const ValueAccessorMultiplexer &accessor_mux) {
    +  // Aggregate per-block state for each aggregate.
    +  std::vector<std::unique_ptr<AggregationState>> local_state;
    +
    +  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
    +    const auto &argument_ids = argument_ids_[agg_idx];
    +    const auto &handle = handles_[agg_idx];
    +
    +    AggregationState *state = nullptr;
         if (is_distinct_[agg_idx]) {
    -      // Call StorageBlock::aggregateDistinct() to put the arguments as keys
    -      // directly into the (threadsafe) shared global distinctify HashTable
    -      // for this aggregate.
    -      block->aggregateDistinct(*handles_[agg_idx],
    -                               arguments_[agg_idx],
    -                               local_arguments_as_attributes,
    -                               {}, /* group_by */
    -                               matches.get(),
    -                               distinctify_hashtables_[agg_idx].get(),
    -                               nullptr /* reuse_group_by_vectors */);
    -      local_state.emplace_back(nullptr);
    +      handle->insertValueAccessorIntoDistinctifyHashTable(
    +          argument_ids,
    +          {},
    +          accessor_mux,
    +          distinctify_hashtables_[agg_idx].get());
         } else {
    -      // Call StorageBlock::aggregate() to actually do the aggregation.
    -      local_state.emplace_back(block->aggregate(*handles_[agg_idx],
    -                                                arguments_[agg_idx],
    -                                                local_arguments_as_attributes,
    -                                                matches.get()));
    +      if (argument_ids.empty()) {
    +        // Special case. This is a nullary aggregate (i.e. COUNT(*)).
    +        ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
    +        DCHECK(base_accessor != nullptr);
    +        state = handle->accumulateNullary(base_accessor->getNumTuplesVirtual());
    +      } else {
    +        // Have the AggregationHandle actually do the aggregation.
    +        state = handle->accumulateValueAccessor(argument_ids, accessor_mux);
    +      }
         }
    +    local_state.emplace_back(state);
       }
     
       // Merge per-block aggregation states back with global state.
       mergeSingleState(local_state);
     }
     
    -void AggregationOperationState::aggregateBlockHashTable(
    -    const block_id input_block,
    -    LIPFilterAdaptiveProber *lip_filter_adaptive_prober) {
    -  BlockReference block(
    -      storage_manager_->getBlock(input_block, input_relation_));
    -
    -  // Apply the predicate first, then the LIPFilters, to generate a TupleIdSequence
    -  // as the existence map for the tuples.
    -  std::unique_ptr<TupleIdSequence> matches;
    -  if (predicate_ != nullptr) {
    -    matches.reset(block->getMatchesForPredicate(predicate_.get()));
    -  }
    -  if (lip_filter_adaptive_prober != nullptr) {
    -    std::unique_ptr<ValueAccessor> accessor(
    -        block->getTupleStorageSubBlock().createValueAccessor(matches.get()));
    -    matches.reset(lip_filter_adaptive_prober->filterValueAccessor(accessor.get()));
    -  }
    -
    -  // This holds values of all the GROUP BY attributes so that the can be reused
    -  // across multiple aggregates (i.e. we only pay the cost of evaluatin the
    -  // GROUP BY expressions once).
    -  std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
    -
    +void AggregationOperationState::mergeSingleState(
    +    const std::vector<std::unique_ptr<AggregationState>> &local_state) {
    +  DEBUG_ASSERT(local_state.size() == single_states_.size());
    --- End diff --
    
    Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-quickstep pull request #179: QUICKSTEP-70-71 Improve aggregation p...

Posted by jianqiao <gi...@git.apache.org>.
Github user jianqiao commented on a diff in the pull request:

    https://github.com/apache/incubator-quickstep/pull/179#discussion_r99927064
  
    --- Diff: relational_operators/InitializeAggregationOperator.cpp ---
    @@ -0,0 +1,72 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + **/
    +
    +#include "relational_operators/InitializeAggregationOperator.hpp"
    +
    +#include <cstddef>
    +
    +#include "query_execution/QueryContext.hpp"
    +#include "query_execution/WorkOrderProtosContainer.hpp"
    +#include "query_execution/WorkOrdersContainer.hpp"
    +#include "relational_operators/WorkOrder.pb.h"
    +#include "storage/AggregationOperationState.hpp"
    +
    +#include "glog/logging.h"
    +
    +#include "tmb/id_typedefs.h"
    +
    +namespace quickstep {
    +
    +bool InitializeAggregationOperator::getAllWorkOrders(
    +    WorkOrdersContainer *container,
    +    QueryContext *query_context,
    +    StorageManager *storage_manager,
    +    const tmb::client_id scheduler_client_id,
    +    tmb::MessageBus *bus) {
    +  if (!started_) {
    +    AggregationOperationState *agg_state =
    +        query_context->getAggregationState(aggr_state_index_);
    +    DCHECK(agg_state != nullptr);
    +
    +    for (std::size_t part_id = 0;
    +         part_id < agg_state->getNumInitializationPartitions();
    +         ++part_id) {
    +      container->addNormalWorkOrder(
    +          new InitializeAggregationWorkOrder(query_id_,
    +                                             part_id,
    +                                             agg_state),
    +          op_index_);
    +    }
    +    started_ = true;
    +  }
    +  return true;
    +}
    +
    +// TODO(quickstep-team) : Think about how the number of partitions could be
    +// accessed in this function. Until then, we can't use partitioned aggregation
    +// initialization with the distributed version.
    --- End diff --
    
    Currently no. For initialization, say we have some the allocated table memory pointer as `m` with size `40MB`, then we just create 10 work orders, so that work order `i` do `memset(m + 4M * i, 0, 4M)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---