You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/01 03:52:13 UTC

[1/2] incubator-quickstep git commit: Updates for distinct

Repository: incubator-quickstep
Updated Branches:
  refs/heads/collision-free-agg 34ea858db -> 963a60428


Updates for distinct


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/3fc85b28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/3fc85b28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/3fc85b28

Branch: refs/heads/collision-free-agg
Commit: 3fc85b28909fe49635325ea70d52d234ae2c957d
Parents: 34ea858
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue Jan 31 21:05:13 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jan 31 21:05:13 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationConcreteHandle.cpp   |  44 ++++++++
 .../aggregation/AggregationConcreteHandle.hpp   | 108 ++++++++++++++++--
 expressions/aggregation/AggregationHandle.hpp   | 111 ++++++++++++++++++-
 .../aggregation/AggregationHandleDistinct.cpp   |   4 +-
 .../aggregation/AggregationHandleSum.cpp        |  17 +++
 .../aggregation/AggregationHandleSum.hpp        |  21 ++++
 storage/AggregationOperationState.cpp           |  13 +--
 .../PackedPayloadAggregationStateHashTable.hpp  |  94 +++++++++++++++-
 8 files changed, 381 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationConcreteHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.cpp b/expressions/aggregation/AggregationConcreteHandle.cpp
index 3151a91..c3d133a 100644
--- a/expressions/aggregation/AggregationConcreteHandle.cpp
+++ b/expressions/aggregation/AggregationConcreteHandle.cpp
@@ -19,6 +19,50 @@
 
 #include "expressions/aggregation/AggregationConcreteHandle.hpp"
 
+#include <cstddef>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/HashTable.hpp"
+#include "storage/HashTableFactory.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
+
 namespace quickstep {
 
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+AggregationStateHashTableBase* AggregationConcreteHandle::createDistinctifyHashTable(
+    const HashTableImplType hash_table_impl,
+    const std::vector<const Type*> &key_types,
+    const std::size_t estimated_num_distinct_keys,
+    StorageManager *storage_manager) const {
+  // Create a hash table with key types as key_types and value type as bool.
+  return AggregationStateHashTableFactory::CreateResizable(
+      hash_table_impl,
+      key_types,
+      estimated_num_distinct_keys,
+      {},
+      storage_manager);
+}
+
+void AggregationConcreteHandle::insertValueAccessorIntoDistinctifyHashTable(
+    ValueAccessor *accessor,
+    const std::vector<attribute_id> &key_ids,
+    AggregationStateHashTableBase *distinctify_hash_table) const {
+  // If the key-value pair is already there, we don't need to update the value,
+  // which should always be "true". I.e. the value is just a placeholder.
+//  AggregationStateFastHashTable *hash_table =
+//      static_cast<AggregationStateFastHashTable *>(distinctify_hash_table);
+//  if (key_ids.size() == 1) {
+//    hash_table->upsertValueAccessorFast(
+//        key_ids, accessor, key_ids[0], true /* check_for_null_keys */);
+//  } else {
+//    std::vector<attribute_id> empty_args {kInvalidAttributeID};
+//    hash_table->upsertValueAccessorCompositeKeyFast(
+//        empty_args, accessor, key_ids, true /* check_for_null_keys */);
+//  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationConcreteHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationConcreteHandle.hpp b/expressions/aggregation/AggregationConcreteHandle.hpp
index 93e9bd0..04be232 100644
--- a/expressions/aggregation/AggregationConcreteHandle.hpp
+++ b/expressions/aggregation/AggregationConcreteHandle.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_CONCRETE_HANDLE_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <utility>
 #include <vector>
 
@@ -28,6 +29,7 @@
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "storage/HashTableBase.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
 #include "threading/SpinMutex.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
@@ -101,10 +103,39 @@ class AggregationConcreteHandle : public AggregationHandle {
                << "takes at least one argument.";
   }
 
+  AggregationStateHashTableBase* createDistinctifyHashTable(
+      const HashTableImplType hash_table_impl,
+      const std::vector<const Type *> &key_types,
+      const std::size_t estimated_num_distinct_keys,
+      StorageManager *storage_manager) const override;
+
+  void insertValueAccessorIntoDistinctifyHashTable(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_ids,
+      AggregationStateHashTableBase *distinctify_hash_table) const override;
+
+  void blockUpdate() override {
+    block_update_ = true;
+  }
+
+  void allowUpdate() override {
+    block_update_ = false;
+  }
+
  protected:
   AggregationConcreteHandle(const AggregationID agg_id)
       : AggregationHandle(agg_id) {}
 
+  template <typename HandleT, typename StateT>
+  StateT* aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+      const AggregationStateHashTableBase &distinctify_hash_table) const;
+
+  template <typename HandleT, typename HashTableT>
+  void aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+      const AggregationStateHashTableBase &distinctify_hash_table,
+      AggregationStateHashTableBase *hash_table,
+      std::size_t index) const;
+
   template <typename HandleT, typename HashTableT>
   ColumnVector* finalizeHashTableHelper(
       const Type &result_type,
@@ -125,6 +156,8 @@ class AggregationConcreteHandle : public AggregationHandle {
         group_state);
   }
 
+  bool block_update_;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(AggregationConcreteHandle);
 };
@@ -148,14 +181,7 @@ class HashTableAggregateFinalizer {
         output_column_vector_(output_column_vector) {}
 
   inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const AggregationState &group_state) {
-    group_by_keys_->emplace_back(group_by_key);
-    output_column_vector_->appendTypedValue(
-        handle_.finalizeHashTableEntry(group_state));
-  }
-
-  inline void operator()(const std::vector<TypedValue> &group_by_key,
-                         const unsigned char *byte_ptr) {
+                         const std::uint8_t *byte_ptr) {
     group_by_keys_->emplace_back(group_by_key);
     output_column_vector_->appendTypedValue(
         handle_.finalizeHashTableEntry(byte_ptr));
@@ -172,6 +198,68 @@ class HashTableAggregateFinalizer {
 // ----------------------------------------------------------------------------
 // Implementations of templated methods follow:
 
+template <typename HandleT, typename StateT>
+StateT* AggregationConcreteHandle::
+    aggregateOnDistinctifyHashTableForSingleUnaryHelperFast(
+        const AggregationStateHashTableBase &distinctify_hash_table) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  StateT *state = static_cast<StateT *>(createInitialState());
+
+  // A lambda function which will be called on each key from the distinctify
+  // hash table.
+  const auto aggregate_functor = [&handle, &state](
+      const TypedValue &key, const std::uint8_t *dumb_placeholder) {
+    // For each (unary) key in the distinctify hash table, aggregate the key
+    // into "state".
+    handle.iterateUnaryInl(key, reinterpret_cast<std::uint8_t *>(state));
+  };
+
+  const auto &hash_table =
+      static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
+          distinctify_hash_table);
+  // Invoke the lambda function "aggregate_functor" on each key from the
+  // distinctify hash table.
+  hash_table.forEach(&aggregate_functor);
+
+  return state;
+}
+
+template <typename HandleT, typename HashTableT>
+void AggregationConcreteHandle::
+    aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast(
+        const AggregationStateHashTableBase &distinctify_hash_table,
+        AggregationStateHashTableBase *aggregation_hash_table,
+        std::size_t index) const {
+  const HandleT &handle = static_cast<const HandleT &>(*this);
+  HashTableT *target_hash_table =
+      static_cast<HashTableT *>(aggregation_hash_table);
+
+  // A lambda function which will be called on each key-value pair from the
+  // distinctify hash table.
+  const auto aggregate_functor = [&handle, &target_hash_table, &index](
+      std::vector<TypedValue> &key, const std::uint8_t *dumb_placeholder) {
+    // For each (composite) key vector in the distinctify hash table with size N.
+    // The first N-1 entries are GROUP BY columns and the last entry is the
+    // argument to be aggregated on.
+    const TypedValue argument(std::move(key.back()));
+    key.pop_back();
+
+    // An upserter as lambda function for aggregating the argument into its
+    // GROUP BY group's entry inside aggregation_hash_table.
+    const auto upserter = [&handle, &argument](std::uint8_t *state) {
+      handle.iterateUnaryInl(argument, state);
+    };
+
+    target_hash_table->upsertCompositeKey(key, &upserter, index);
+  };
+
+  const HashTableT &source_hash_table =
+      static_cast<const HashTableT &>(distinctify_hash_table);
+  // Invoke the lambda function "aggregate_functor" on each composite key vector
+  // from the distinctify hash table.
+  source_hash_table.forEachCompositeKey(&aggregate_functor);
+}
+
 template <typename HandleT, typename HashTableT>
 ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
     const Type &result_type,
@@ -188,14 +276,14 @@ ColumnVector* AggregationConcreteHandle::finalizeHashTableHelper(
           new NativeColumnVector(result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, NativeColumnVector> finalizer(
           handle, group_by_keys, result);
-      hash_table_concrete.forEach(&finalizer, index);
+      hash_table_concrete.forEachCompositeKey(&finalizer, index);
       return result;
     } else {
       IndirectColumnVector *result = new IndirectColumnVector(
           result_type, hash_table_concrete.numEntries());
       HashTableAggregateFinalizer<HandleT, IndirectColumnVector> finalizer(
           handle, group_by_keys, result);
-      hash_table_concrete.forEach(&finalizer, index);
+      hash_table_concrete.forEachCompositeKey(&finalizer, index);
       return result;
     }
   } else {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandle.hpp b/expressions/aggregation/AggregationHandle.hpp
index 8e2aea6..bc9c27f 100644
--- a/expressions/aggregation/AggregationHandle.hpp
+++ b/expressions/aggregation/AggregationHandle.hpp
@@ -209,6 +209,97 @@ class AggregationHandle {
       int index) const = 0;
 
   /**
+   * @brief Create a new HashTable for the distinctify step for DISTINCT
+   * aggregation.
+   *
+   * Distinctify is the first step for DISTINCT aggregation. This step inserts
+   * the GROUP BY expression values and aggregation arguments together as keys
+   * into the distinctify hash table, so that arguments are distinctified within
+   * each GROUP BY group. Later, a second-round aggregation on the distinctify
+   * hash table will be performed to actually compute the aggregated result for
+   * each GROUP BY group.
+   *
+   * In the case of single aggregation where there is no GROUP BY expressions,
+   * we simply treat it as a special GROUP BY case that the GROUP BY expression
+   * vector is empty.
+   *
+   * @param hash_table_impl The choice of which concrete HashTable
+   *        implementation to use.
+   * @param key_types The types of the GROUP BY expressions together with the
+   *        types of the aggregation arguments.
+   * @param estimated_num_distinct_keys The estimated number of distinct keys
+   *        (i.e. GROUP BY expressions together with aggregation arguments) for
+   *        the distinctify step. This is used to size the initial HashTable.
+   *        This is an estimate only, and the HashTable will be resized if it
+   *        becomes over-full.
+   * @param storage_manager The StorageManager to use to create the HashTable.
+   *        A StorageBlob will be allocated to serve as the HashTable's
+   *        in-memory storage.
+   *
+   * @return A new HashTable instance with the appropriate state type for this
+   *         aggregate.
+   */
+  virtual AggregationStateHashTableBase* createDistinctifyHashTable(
+      const HashTableImplType hash_table_impl,
+      const std::vector<const Type *> &key_types,
+      const std::size_t estimated_num_distinct_keys,
+      StorageManager *storage_manager) const = 0;
+
+  /**
+   * @brief Inserts the GROUP BY expressions and aggregation arguments together
+   * as keys into the distinctify hash table.
+   *
+   * @param accessor The ValueAccessor that will be iterated over to read
+   *        tuples.
+   * @param key_ids The attribute_ids of the GROUP BY expressions in accessor
+   *        together with the attribute_ids of the arguments to this aggregate
+   *        in accessor, in order.
+   * @param distinctify_hash_table The HashTable to store the GROUP BY
+   *        expressions and the aggregation arguments together as hash table
+   *        keys and a bool constant \c true as hash table value (So the hash
+   *        table actually serves as a hash set). This should have been created
+   *        by calling createDistinctifyHashTable();
+   */
+  virtual void insertValueAccessorIntoDistinctifyHashTable(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_ids,
+      AggregationStateHashTableBase *distinctify_hash_table) const = 0;
+
+  /**
+   * @brief Perform single (i.e. without GROUP BY) aggregation on the keys from
+   * the distinctify hash table to actually compute the aggregated results.
+   *
+   * @param distinctify_hash_table Hash table which stores the distinctified
+   *        aggregation arguments as hash table keys. This should have been
+   *        created by calling createDistinctifyHashTable();
+   * @return A new AggregationState which contains the aggregated results from
+   *         applying the aggregate to the distinctify hash table.
+   *         Caller is responsible for deleting the returned AggregationState.
+   */
+  virtual AggregationState* aggregateOnDistinctifyHashTableForSingle(
+      const AggregationStateHashTableBase &distinctify_hash_table) const {
+    return nullptr;
+  }
+
+  /**
+   * @brief Perform GROUP BY aggregation on the keys from the distinctify hash
+   * table and upserts states into the aggregation hash table.
+   *
+   * @param distinctify_hash_table Hash table which stores the GROUP BY
+   *        expression values and aggregation arguments together as hash table
+   *        keys.
+   * @param aggregation_hash_table The HashTable to upsert AggregationStates in.
+   *        This should have been created by calling createGroupByHashTable() on
+   *        this same AggregationHandle.
+   * @param index The index of the distinctify hash table for which we perform
+   *        the DISTINCT aggregation.
+   */
+  virtual void aggregateOnDistinctifyHashTableForGroupBy(
+      const AggregationStateHashTableBase &distinctify_hash_table,
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const {}
+
+  /**
    * @brief Get the number of bytes needed to store the aggregation handle's
    *        state.
    **/
@@ -236,7 +327,7 @@ class AggregationHandle {
    * @param byte_ptr The pointer where the aggregation state is stored.
    **/
   virtual void updateStateUnary(const TypedValue &argument,
-                                std::uint8_t *byte_ptr) const {}
+                                std::uint8_t *byte_ptr) const = 0;
 
   /**
    * @brief Merge two aggregation states for this aggregation handle.
@@ -248,7 +339,7 @@ class AggregationHandle {
    * @param dst A pointer to the destination aggregation state.
    **/
   virtual void mergeStates(const std::uint8_t *src,
-                           std::uint8_t *dst) const {}
+                           std::uint8_t *dst) const = 0;
 
   /**
    * @brief Initialize the payload (in the aggregation hash table) for the given
@@ -256,7 +347,7 @@ class AggregationHandle {
    *
    * @param byte_ptr The pointer to the aggregation state in the hash table.
    **/
-  virtual void initPayload(std::uint8_t *byte_ptr) const {}
+  virtual void initPayload(std::uint8_t *byte_ptr) const = 0;
 
   /**
    * @brief Destroy the payload (in the aggregation hash table) for the given
@@ -264,7 +355,19 @@ class AggregationHandle {
    *
    * @param byte_ptr The pointer to the aggregation state in the hash table.
    **/
-  virtual void destroyPayload(std::uint8_t *byte_ptr) const {}
+  virtual void destroyPayload(std::uint8_t *byte_ptr) const {};
+
+  /**
+   * @brief Inform the aggregation handle to block (prohibit) updates on the
+   *        aggregation state.
+   **/
+  virtual void blockUpdate() = 0;
+
+  /**
+   * @brief Inform the aggregation handle to allow updates on the
+   *        aggregation state.
+   **/
+  virtual void allowUpdate() = 0;
 
  protected:
   AggregationHandle(const AggregationID agg_id)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
index c6c47c7..1886335 100644
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ b/expressions/aggregation/AggregationHandleDistinct.cpp
@@ -41,11 +41,11 @@ ColumnVector* AggregationHandleDistinct::finalizeHashTable(
   DCHECK(group_by_keys->empty());
 
   const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
-                                               const bool &dumb_placeholder) -> void {
+                                               const std::uint8_t *dumb_placeholder) -> void {
     group_by_keys->emplace_back(std::move(group_by_key));
   };
   static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
-      hash_table).forEach(&keys_retriever);
+      hash_table).forEachCompositeKey(&keys_retriever);
 
   return nullptr;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 00b229e..29a986f 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -150,4 +150,21 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
           *result_type_, hash_table, group_by_keys, index);
 }
 
+AggregationState* AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
+    const AggregationStateHashTableBase &distinctify_hash_table) const {
+  return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
+      AggregationHandleSum,
+      AggregationStateSum>(distinctify_hash_table);
+}
+
+void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
+    const AggregationStateHashTableBase &distinctify_hash_table,
+    AggregationStateHashTableBase *aggregation_hash_table,
+    std::size_t index) const {
+  aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+      AggregationHandleSum,
+      PackedPayloadSeparateChainingAggregationStateHashTable>(
+          distinctify_hash_table, aggregation_hash_table, index);
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index 9fb7706..cdcec4b 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -123,6 +123,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
     state->null_ = false;
   }
 
+  inline void iterateUnaryInl(const TypedValue &value,
+                              std::uint8_t *byte_ptr) const {
+    DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
+    if (value.isNull()) return;
+    TypedValue *sum_ptr =
+        reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+    bool *null_ptr =
+        reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+    *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
+    *null_ptr = false;
+  }
+
   AggregationState* accumulate(
       ValueAccessor *accessor,
       ColumnVectorsValueAccessor *aux_accessor,
@@ -178,6 +190,15 @@ class AggregationHandleSum : public AggregationConcreteHandle {
       std::vector<std::vector<TypedValue>> *group_by_keys,
       int index) const override;
 
+  AggregationState* aggregateOnDistinctifyHashTableForSingle(
+      const AggregationStateHashTableBase &distinctify_hash_table)
+      const override;
+
+  void aggregateOnDistinctifyHashTableForGroupBy(
+      const AggregationStateHashTableBase &distinctify_hash_table,
+      AggregationStateHashTableBase *aggregation_hash_table,
+      std::size_t index) const override;
+
  private:
   friend class AggregateFunctionSum;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 5de2653..d04af81 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -123,12 +123,10 @@ AggregationOperationState::AggregationOperationState(
     // on the group-by expressions.
     DCHECK_GT(group_by_key_ids_.size(), 0u);
 
-    handles_.emplace_back(new AggregationHandleDistinct());
-    is_distinct_.emplace_back(false);
     group_by_hashtable_pool_.reset(new HashTablePool(estimated_num_entries,
                                                      hash_table_impl_type,
                                                      group_by_types_,
-                                                     {handles_.front().get()},
+                                                     {},
                                                      storage_manager));
   } else {
     std::vector<AggregationHandle *> group_by_handles;
@@ -471,9 +469,9 @@ void AggregationOperationState::mergeSingleState(
 
 void AggregationOperationState::mergeGroupByHashTables(
     AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) const {
-  HashTableMergerFast merger(dst);
+  HashTableMerger merger(dst);
   static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(src)
-      ->forEach(&merger);
+      ->forEachCompositeKey(&merger);
 }
 
 void AggregationOperationState::aggregateBlockHashTable(
@@ -662,11 +660,6 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
   complete_result.addColumn(key_cv.release());
 
   for (std::size_t i = 0; i < handles_.size(); ++i) {
-    if (handles_[i]->getAggregationID() == AggregationID::kDistinct) {
-      DCHECK_EQ(1u, handles_.size());
-      break;
-    }
-
     const Type *result_type = handles_[i]->getResultType();
     DCHECK(NativeColumnVector::UsableForType(*result_type));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3fc85b28/storage/PackedPayloadAggregationStateHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadAggregationStateHashTable.hpp b/storage/PackedPayloadAggregationStateHashTable.hpp
index 70152e7..8364bb4 100644
--- a/storage/PackedPayloadAggregationStateHashTable.hpp
+++ b/storage/PackedPayloadAggregationStateHashTable.hpp
@@ -89,6 +89,11 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
   inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
                                  const std::uint8_t *source_state);
 
+  template <typename FunctorT>
+  inline bool upsertCompositeKey(const std::vector<TypedValue> &key,
+                                 FunctorT *functor,
+                                 int index);
+
   inline const std::uint8_t* getSingleCompositeKey(
       const std::vector<TypedValue> &key) const;
 
@@ -102,6 +107,13 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
   template <typename FunctorT>
   inline std::size_t forEach(FunctorT *functor, const int index) const;
 
+  template <typename FunctorT>
+  inline std::size_t forEachCompositeKey(FunctorT *functor) const;
+
+  template <typename FunctorT>
+  inline std::size_t forEachCompositeKey(FunctorT *functor,
+                                         const int index) const;
+
  private:
   void resize(const std::size_t extra_buckets,
               const std::size_t extra_variable_storage,
@@ -118,6 +130,10 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
     return total;
   }
 
+  inline bool getNextEntry(TypedValue *key,
+                           const std::uint8_t **value,
+                           std::size_t *entry_num) const;
+
   inline bool getNextEntryCompositeKey(std::vector<TypedValue> *key,
                                        const std::uint8_t **value,
                                        std::size_t *entry_num) const;
@@ -301,7 +317,7 @@ class PackedPayloadSeparateChainingAggregationStateHashTable
 // ----------------------------------------------------------------------------
 // Implementations of template class methods follow.
 
-class HashTableMergerFast {
+class HashTableMerger {
  public:
   /**
    * @brief Constructor
@@ -310,7 +326,7 @@ class HashTableMergerFast {
    * @param destination_hash_table The destination hash table to which other
    *        hash tables will be merged.
    **/
-  explicit HashTableMergerFast(
+  explicit HashTableMerger(
       AggregationStateHashTableBase *destination_hash_table)
       : destination_hash_table_(
             static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
@@ -331,7 +347,7 @@ class HashTableMergerFast {
  private:
   PackedPayloadSeparateChainingAggregationStateHashTable *destination_hash_table_;
 
-  DISALLOW_COPY_AND_ASSIGN(HashTableMergerFast);
+  DISALLOW_COPY_AND_ASSIGN(HashTableMerger);
 };
 
 inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
@@ -348,6 +364,23 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
 }
 
 inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+    ::getNextEntry(TypedValue *key,
+                   const std::uint8_t **value,
+                   std::size_t *entry_num) const {
+  if (*entry_num < header_->buckets_allocated.load(std::memory_order_relaxed)) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + (*entry_num) * bucket_size_;
+    *key = key_manager_.getKeyComponentTyped(bucket, 0);
+    *value = reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset);
+    ++(*entry_num);
+    return true;
+  } else {
+    return false;
+  }
+}
+
+
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
     ::getNextEntryCompositeKey(std::vector<TypedValue> *key,
                                const std::uint8_t **value,
                                std::size_t *entry_num) const {
@@ -513,6 +546,28 @@ inline bool PackedPayloadSeparateChainingAggregationStateHashTable
   }
 }
 
+template <typename FunctorT>
+inline bool PackedPayloadSeparateChainingAggregationStateHashTable
+    ::upsertCompositeKey(const std::vector<TypedValue> &key,
+                         FunctorT *functor,
+                         int index) {
+  const std::size_t variable_size =
+      calculateVariableLengthCompositeKeyCopySize(key);
+  for (;;) {
+    {
+      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
+      std::uint8_t *value =
+          upsertCompositeKeyInternal(key, variable_size);
+      if (value != nullptr) {
+        (*functor)(value + payload_offsets_[index]);
+        return true;
+      }
+    }
+    resize(0, variable_size);
+  }
+}
+
+
 inline std::uint8_t* PackedPayloadSeparateChainingAggregationStateHashTable
     ::upsertCompositeKeyInternal(const std::vector<TypedValue> &key,
                                  const std::size_t variable_key_size) {
@@ -690,6 +745,35 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
     ::forEach(FunctorT *functor) const {
   std::size_t entries_visited = 0;
   std::size_t entry_num = 0;
+  TypedValue key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntry(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr);
+  }
+  return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+    ::forEach(FunctorT *functor, const int index) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
+  TypedValue key;
+  const std::uint8_t *value_ptr;
+  while (getNextEntry(&key, &value_ptr, &entry_num)) {
+    ++entries_visited;
+    (*functor)(key, value_ptr + payload_offsets_[index]);
+    key.clear();
+  }
+  return entries_visited;
+}
+
+template <typename FunctorT>
+inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
+    ::forEachCompositeKey(FunctorT *functor) const {
+  std::size_t entries_visited = 0;
+  std::size_t entry_num = 0;
   std::vector<TypedValue> key;
   const std::uint8_t *value_ptr;
   while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
@@ -702,7 +786,8 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
 
 template <typename FunctorT>
 inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
-    ::forEach(FunctorT *functor, const int index) const {
+    ::forEachCompositeKey(FunctorT *functor,
+                          const int index) const {
   std::size_t entries_visited = 0;
   std::size_t entry_num = 0;
   std::vector<TypedValue> key;
@@ -715,7 +800,6 @@ inline std::size_t PackedPayloadSeparateChainingAggregationStateHashTable
   return entries_visited;
 }
 
-
 }  // namespace quickstep
 
 #endif  // QUICKSTEP_STORAGE_PACKED_PAYLOAD_AGGREGATION_STATE_HASH_TABLE_HPP_


[2/2] incubator-quickstep git commit: updates

Posted by ji...@apache.org.
updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/963a6042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/963a6042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/963a6042

Branch: refs/heads/collision-free-agg
Commit: 963a604288e158b953f99fac16ff03e0015d9860
Parents: 3fc85b2
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Tue Jan 31 21:52:01 2017 -0600
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jan 31 21:52:01 2017 -0600

----------------------------------------------------------------------
 .../aggregation/AggregationHandleDistinct.cpp   |  53 ----------
 .../aggregation/AggregationHandleDistinct.hpp   | 106 -------------------
 expressions/aggregation/CMakeLists.txt          |  13 ---
 query_optimizer/ExecutionGenerator.cpp          |   5 +
 storage/AggregationOperationState.cpp           |  42 ++++++--
 storage/CMakeLists.txt                          |   1 -
 6 files changed, 38 insertions(+), 182 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/expressions/aggregation/AggregationHandleDistinct.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.cpp b/expressions/aggregation/AggregationHandleDistinct.cpp
deleted file mode 100644
index 1886335..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "storage/PackedPayloadAggregationStateHashTable.hpp"
-
-#include "types/TypedValue.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class ColumnVector;
-
-ColumnVector* AggregationHandleDistinct::finalizeHashTable(
-    const AggregationStateHashTableBase &hash_table,
-    std::vector<std::vector<TypedValue>> *group_by_keys,
-    int index) const {
-  DCHECK(group_by_keys->empty());
-
-  const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
-                                               const std::uint8_t *dumb_placeholder) -> void {
-    group_by_keys->emplace_back(std::move(group_by_key));
-  };
-  static_cast<const PackedPayloadSeparateChainingAggregationStateHashTable &>(
-      hash_table).forEachCompositeKey(&keys_retriever);
-
-  return nullptr;
-}
-
-}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/expressions/aggregation/AggregationHandleDistinct.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleDistinct.hpp b/expressions/aggregation/AggregationHandleDistinct.hpp
deleted file mode 100644
index 0d8905b..0000000
--- a/expressions/aggregation/AggregationHandleDistinct.hpp
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- **/
-
-#ifndef QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-#define QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_
-
-#include <cstddef>
-#include <memory>
-#include <vector>
-
-#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationConcreteHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
-#include "storage/HashTableBase.hpp"
-#include "types/TypedValue.hpp"
-#include "utility/Macros.hpp"
-
-#include "glog/logging.h"
-
-namespace quickstep {
-
-class AggregationState;
-class ColumnVector;
-class StorageManager;
-class Type;
-class ValueAccessor;
-
-/** \addtogroup Expressions
- *  @{
- */
-
-class AggregationHandleDistinct : public AggregationConcreteHandle {
- public:
-  /**
-   * @brief Constructor.
-   **/
-  AggregationHandleDistinct()
-      : AggregationConcreteHandle(AggregationID::kDistinct) {}
-
-  std::vector<const Type *> getArgumentTypes() const override {
-    return {};
-  }
-
-  const Type* getResultType() const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support getResultType().";
-  }
-
-  AggregationState* createInitialState() const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support createInitialState().";
-  }
-
-  AggregationState* accumulateNullary(
-      const std::size_t num_tuples) const override {
-    LOG(FATAL)
-        << "AggregationHandleDistinct does not support accumulateNullary().";
-  }
-
-  AggregationState* accumulate(
-      ValueAccessor *accessor,
-      ColumnVectorsValueAccessor *aux_accessor,
-      const std::vector<attribute_id> &argument_ids) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support "
-                  "accumulate().";
-  }
-
-  void mergeStates(const AggregationState &source,
-                   AggregationState *destination) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support mergeStates().";
-  }
-
-  TypedValue finalize(const AggregationState &state) const override {
-    LOG(FATAL) << "AggregationHandleDistinct does not support finalize().";
-  }
-
-  ColumnVector* finalizeHashTable(
-      const AggregationStateHashTableBase &hash_table,
-      std::vector<std::vector<TypedValue>> *group_by_keys,
-      int index) const override;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(AggregationHandleDistinct);
-};
-
-/** @} */
-
-}  // namespace quickstep
-
-#endif  // QUICKSTEP_EXPRESSIONS_AGGREGATION_AGGREGATION_HANDLE_DISTINCT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index bd239d4..432da09 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -55,9 +55,6 @@ add_library(quickstep_expressions_aggregation_AggregationHandleAvg
 add_library(quickstep_expressions_aggregation_AggregationHandleCount
             AggregationHandleCount.cpp
             AggregationHandleCount.hpp)
-add_library(quickstep_expressions_aggregation_AggregationHandleDistinct
-            AggregationHandleDistinct.cpp
-            AggregationHandleDistinct.hpp)
 add_library(quickstep_expressions_aggregation_AggregationHandleMax
             AggregationHandleMax.cpp
             AggregationHandleMax.hpp)
@@ -193,15 +190,6 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorUtil
                       quickstep_utility_Macros)
-target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinct
-                      glog
-                      quickstep_catalog_CatalogTypedefs
-                      quickstep_expressions_aggregation_AggregationConcreteHandle
-                      quickstep_expressions_aggregation_AggregationID
-                      quickstep_storage_HashTableBase
-                      quickstep_storage_PackedPayloadAggregationStateHashTable
-                      quickstep_types_TypedValue
-                      quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
                       glog
                       quickstep_catalog_CatalogTypedefs
@@ -267,7 +255,6 @@ target_link_libraries(quickstep_expressions_aggregation
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationHandleAvg
                       quickstep_expressions_aggregation_AggregationHandleCount
-                      quickstep_expressions_aggregation_AggregationHandleDistinct
                       quickstep_expressions_aggregation_AggregationHandleMax
                       quickstep_expressions_aggregation_AggregationHandleMin
                       quickstep_expressions_aggregation_AggregationHandleSum

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6694001..d32505b 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -429,6 +429,11 @@ bool ExecutionGenerator::canUseCollisionFreeAggregation(
   for (const auto &agg_expr : aggregate->aggregate_expressions()) {
     const E::AggregateFunctionPtr agg_func =
         std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+    if (agg_func->is_distinct()) {
+      return false;
+    }
+
     switch (agg_func->getAggregate().getAggregationID()) {
       case AggregationID::kCount:  // Fall through
       case AggregationID::kSum:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index d04af81..1bc5832 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -34,7 +34,6 @@
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunctionFactory.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationHandleDistinct.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
@@ -676,14 +675,25 @@ void AggregationOperationState::finalizeHashTableImplCollisionFree(
 void AggregationOperationState::finalizeHashTableImplPartitioned(
     const std::size_t partition_id,
     InsertDestination *output_destination) {
+  PackedPayloadSeparateChainingAggregationStateHashTable *hash_table =
+      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+          partitioned_group_by_hashtable_pool_->getHashTable(partition_id));
+
   // Each element of 'group_by_keys' is a vector of values for a particular
   // group (which is also the prefix of the finalized Tuple for that group).
   std::vector<std::vector<TypedValue>> group_by_keys;
 
+  if (handles_.empty()) {
+    const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+                                                 const std::uint8_t *dumb_placeholder) -> void {
+      group_by_keys.emplace_back(std::move(group_by_key));
+    };
+
+    hash_table->forEachCompositeKey(&keys_retriever);
+  }
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
-  AggregationStateHashTableBase *hash_table =
-      partitioned_group_by_hashtable_pool_->getHashTable(partition_id);
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
     ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
         *hash_table, &group_by_keys, agg_idx);
@@ -737,10 +747,6 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
 
 void AggregationOperationState::finalizeHashTableImplThreadPrivate(
     InsertDestination *output_destination) {
-  // Each element of 'group_by_keys' is a vector of values for a particular
-  // group (which is also the prefix of the finalized Tuple for that group).
-  std::vector<std::vector<TypedValue>> group_by_keys;
-
   // TODO(harshad) - The merge phase may be slower when each hash table contains
   // large number of entries. We should find ways in which we can perform a
   // parallel merge.
@@ -754,15 +760,33 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
     return;
   }
 
-  std::unique_ptr<AggregationStateHashTableBase> final_hash_table(
+  std::unique_ptr<AggregationStateHashTableBase> final_hash_table_ptr(
       hash_tables->back().release());
   for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
     std::unique_ptr<AggregationStateHashTableBase> hash_table(
         hash_tables->at(i).release());
-    mergeGroupByHashTables(hash_table.get(), final_hash_table.get());
+    mergeGroupByHashTables(hash_table.get(), final_hash_table_ptr.get());
     hash_table->destroyPayload();
   }
 
+  PackedPayloadSeparateChainingAggregationStateHashTable *final_hash_table =
+      static_cast<PackedPayloadSeparateChainingAggregationStateHashTable *>(
+          final_hash_table_ptr.get());
+
+  // Each element of 'group_by_keys' is a vector of values for a particular
+  // group (which is also the prefix of the finalized Tuple for that group).
+  std::vector<std::vector<TypedValue>> group_by_keys;
+
+  if (handles_.empty()) {
+    const auto keys_retriever = [&group_by_keys](std::vector<TypedValue> &group_by_key,
+                                                 const std::uint8_t *dumb_placeholder) -> void {
+      group_by_keys.emplace_back(std::move(group_by_key));
+    };
+
+    final_hash_table->forEachCompositeKey(&keys_retriever);
+  }
+
+
   // Collect per-aggregate finalized values.
   std::vector<std::unique_ptr<ColumnVector>> final_values;
   for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/963a6042/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index c7bc28f..4ff612e 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -274,7 +274,6 @@ target_link_libraries(quickstep_storage_AggregationOperationState
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunctionFactory
                       quickstep_expressions_aggregation_AggregationHandle
-                      quickstep_expressions_aggregation_AggregationHandleDistinct
                       quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar