You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/04/09 01:48:54 UTC
[14/14] incubator-quickstep git commit: Thread private numeric
aggregation
Thread private numeric aggregation
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cc1a86fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cc1a86fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cc1a86fc
Branch: refs/heads/common-subexpression
Commit: cc1a86fc481230c6f61f710917b3a1194283b97b
Parents: ab3a667
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Sat Apr 8 19:04:12 2017 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Sat Apr 8 20:48:27 2017 -0500
----------------------------------------------------------------------
query_optimizer/ExecutionGenerator.cpp | 15 +-
.../cost_model/StarSchemaSimpleCostModel.cpp | 51 ++
.../cost_model/StarSchemaSimpleCostModel.hpp | 3 +
storage/AggregationOperationState.cpp | 57 ++-
storage/AggregationOperationState.hpp | 6 +-
storage/CMakeLists.txt | 20 +
storage/CollisionFreeVectorTable.hpp | 4 +
storage/HashTable.proto | 1 +
storage/HashTableBase.hpp | 5 +-
storage/HashTableFactory.hpp | 13 +-
storage/HashTablePool.hpp | 4 +
storage/PackedPayloadHashTable.hpp | 4 +
storage/ThreadPrivateNumericHashTable.cpp | 0
storage/ThreadPrivateNumericHashTable.hpp | 483 +++++++++++++++++++
14 files changed, 650 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6fec85b..387f26a 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1512,14 +1512,23 @@ void ExecutionGenerator::convertAggregate(
->canUseCollisionFreeAggregation(physical_plan,
estimated_num_groups,
&max_num_groups)) {
+ std::cout << "Use collision free\n";
aggr_state_proto->set_hash_table_impl_type(
serialization::HashTableImplType::COLLISION_FREE_VECTOR);
aggr_state_proto->set_estimated_num_entries(max_num_groups);
use_parallel_initialization = true;
} else {
- // Otherwise, use SeparateChaining.
- aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::SEPARATE_CHAINING);
+ if (cost_model_for_aggregation_->canUseTwoPhaseNumericAggregation(
+ physical_plan, estimated_num_groups)) {
+ std::cout << "Use two phase numeric\n";
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC);
+ } else {
+ // Otherwise, use SeparateChaining.
+ std::cout << "Use normal\n";
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ }
aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index fc775c7..d70e156 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -688,6 +688,57 @@ bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
return true;
}
+bool StarSchemaSimpleCostModel::canUseTwoPhaseNumericAggregation(
+ const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups) {
+ if (estimated_num_groups >= 1000u) {
+ return false;
+ }
+
+ std::size_t total_key_size = 0;
+ for (const auto &key_expr : aggregate->grouping_expressions()) {
+ const Type &type = key_expr->getValueType();
+ if (type.isVariableLength()) {
+ return false;
+ }
+
+ const std::size_t key_size = type.maximumByteLength();
+ if (!QUICKSTEP_EQUALS_ANY_CONSTANT(key_size, 1u, 2u, 4u, 8u)) {
+ return false;
+ }
+
+ total_key_size += key_size;
+ if (total_key_size > 8u) {
+ return false;
+ }
+ }
+
+ for (const auto &agg_alias : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_expr =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_alias->expression());
+ if (agg_expr->is_distinct()) {
+ return false;
+ }
+ switch (agg_expr->getAggregate().getAggregationID()) {
+ case AggregationID::kCount:
+ break;
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, agg_expr->getArguments().size());
+ const auto &argument = agg_expr->getArguments().front();
+ if (!QUICKSTEP_EQUALS_ANY_CONSTANT(argument->getValueType().getTypeID(),
+ kInt, kLong, kFloat, kDouble)) {
+ return false;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ }
+
+ return true;
+}
+
} // namespace cost
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index afb2ef9..aa09c31 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -183,6 +183,9 @@ class StarSchemaSimpleCostModel : public CostModel {
const std::size_t estimated_num_groups,
std::size_t *max_num_groups);
+ bool canUseTwoPhaseNumericAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups);
+
private:
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index facc7fa..d6f21bc 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -49,6 +49,7 @@
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/SubBlocksReference.hpp"
+#include "storage/ThreadPrivateNumericHashTable.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
@@ -94,11 +95,15 @@ AggregationOperationState::AggregationOperationState(
!is_distinct_.empty(), std::logical_and<bool>())),
storage_manager_(storage_manager) {
if (!group_by.empty()) {
- if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
- is_aggregate_collision_free_ = true;
- } else {
- is_aggregate_partitioned_ = checkAggregatePartitioned(
- estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+ switch (hash_table_impl_type) {
+ case HashTableImplType::kCollisionFreeVector:
+ is_aggregate_collision_free_ = true;
+ break;
+ case HashTableImplType::kThreadPrivateNumeric:
+ break;
+ default:
+ is_aggregate_partitioned_ = checkAggregatePartitioned(
+ estimated_num_entries, is_distinct_, group_by, aggregate_functions);
}
}
@@ -715,7 +720,17 @@ void AggregationOperationState::finalizeHashTable(
finalizeHashTableImplPartitioned(partition_id, output_destination);
} else {
DCHECK_EQ(0u, partition_id);
- finalizeHashTableImplThreadPrivate(output_destination);
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+ switch (group_by_hashtable_pool_->getHashTableImplType()) {
+ case HashTableImplType::kSeparateChaining:
+ finalizeHashTableImplThreadPrivatePackedPayload(output_destination);
+ break;
+ case HashTableImplType::kThreadPrivateNumeric:
+ finalizeHashTableImplThreadPrivateNumeric(output_destination);
+ break;
+ default:
+ LOG(FATAL) << "Not supported";
+ }
}
}
@@ -840,7 +855,7 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
output_destination->bulkInsertTuples(&complete_result);
}
-void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+void AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload(
InsertDestination *output_destination) {
// TODO(harshad) - The merge phase may be slower when each hash table contains
// large number of entries. We should find ways in which we can perform a
@@ -948,6 +963,34 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
output_destination->bulkInsertTuples(&complete_result);
}
+void AggregationOperationState::finalizeHashTableImplThreadPrivateNumeric(
+ InsertDestination *output_destination) {
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
+ }
+
+ std::unique_ptr<ThreadPrivateNumericHashTable> final_hash_table(
+ static_cast<ThreadPrivateNumericHashTable*>(hash_tables->back().release()));
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ final_hash_table->merge(
+ static_cast<const ThreadPrivateNumericHashTable*>(hash_table.get()));
+ hash_table->destroyPayload();
+ }
+
+// final_hash_table->print();
+
+ ColumnVectorsValueAccessor complete_result;
+ final_hash_table->finalize(&complete_result);
+ final_hash_table->destroyPayload();
+
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e6af494..e666a68 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -256,7 +256,11 @@ class AggregationOperationState {
void finalizeHashTableImplPartitioned(const std::size_t partition_id,
InsertDestination *output_destination);
- void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
+ void finalizeHashTableImplThreadPrivatePackedPayload(
+ InsertDestination *output_destination);
+
+ void finalizeHashTableImplThreadPrivateNumeric(
+ InsertDestination *output_destination);
std::size_t getMemoryConsumptionBytesHelper(
const std::vector<std::unique_ptr<AggregationStateHashTableBase>>
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 0a1d484..b971240 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -250,6 +250,9 @@ add_library(quickstep_storage_StorageManager StorageManager.cpp StorageManager.h
add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp SubBlockTypeRegistry.hpp)
add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp SubBlockTypeRegistryMacros.hpp)
add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp SubBlocksReference.hpp)
+add_library(quickstep_storage_ThreadPrivateNumericHashTable
+ ThreadPrivateNumericHashTable.cpp
+ ThreadPrivateNumericHashTable.hpp)
add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.hpp)
add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
@@ -289,6 +292,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
quickstep_storage_SubBlocksReference
+ quickstep_storage_ThreadPrivateNumericHashTable
quickstep_storage_TupleIdSequence
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
@@ -724,6 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
quickstep_storage_PackedPayloadHashTable
quickstep_storage_SeparateChainingHashTable
quickstep_storage_SimpleScalarSeparateChainingHashTable
+ quickstep_storage_ThreadPrivateNumericHashTable
quickstep_storage_TupleReference
quickstep_types_Type
quickstep_types_TypeFactory
@@ -1039,6 +1044,20 @@ target_link_libraries(quickstep_storage_SubBlockTypeRegistry
target_link_libraries(quickstep_storage_SubBlocksReference
glog
quickstep_utility_PtrVector)
+target_link_libraries(quickstep_storage_ThreadPrivateNumericHashTable
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_HashTableBase
+ quickstep_storage_ValueAccessorMultiplexer
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer)
target_link_libraries(quickstep_storage_TupleIdSequence
quickstep_storage_StorageBlockInfo
quickstep_utility_BitVector
@@ -1164,6 +1183,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_SubBlockTypeRegistry
quickstep_storage_SubBlockTypeRegistryMacros
quickstep_storage_SubBlocksReference
+ quickstep_storage_ThreadPrivateNumericHashTable
quickstep_storage_TupleIdSequence
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 490a5cc..221a221 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -70,6 +70,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
~CollisionFreeVectorTable() override;
+ HashTableImplType getImplType() const override {
+ return HashTableImplType::kCollisionFreeVector;
+ }
+
void destroyPayload() override;
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 6839ebc..80e363c 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -26,6 +26,7 @@ enum HashTableImplType {
LINEAR_OPEN_ADDRESSING = 1;
SEPARATE_CHAINING = 2;
SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
+ THREAD_PRIVATE_NUMERIC = 4;
}
// NOTE(chasseur): This proto describes the run-time parameters for a resizable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 8be388a..c3cbddf 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -44,7 +44,8 @@ enum class HashTableImplType {
kCollisionFreeVector,
kLinearOpenAddressing,
kSeparateChaining,
- kSimpleScalarSeparateChaining
+ kSimpleScalarSeparateChaining,
+ kThreadPrivateNumeric
};
/**
@@ -117,6 +118,8 @@ class AggregationStateHashTableBase {
virtual std::size_t getMemoryConsumptionBytes() const = 0;
+ virtual HashTableImplType getImplType() const = 0;
+
protected:
AggregationStateHashTableBase() {}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 9686429..52f4d5f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -32,6 +32,7 @@
#include "storage/PackedPayloadHashTable.hpp"
#include "storage/SeparateChainingHashTable.hpp"
#include "storage/SimpleScalarSeparateChainingHashTable.hpp"
+#include "storage/ThreadPrivateNumericHashTable.hpp"
#include "storage/TupleReference.hpp"
#include "types/TypeFactory.hpp"
#include "utility/BloomFilter.hpp"
@@ -123,6 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
return HashTableImplType::kSeparateChaining;
case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING:
return HashTableImplType::kSimpleScalarSeparateChaining;
+ case serialization::HashTableImplType::THREAD_PRIVATE_NUMERIC:
+ return HashTableImplType::kThreadPrivateNumeric;
default: {
LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n";
}
@@ -355,7 +358,6 @@ class AggregationStateHashTableFactory {
* hash table constructor.
* @return A new aggregation state hash table.
**/
-
static AggregationStateHashTableBase* CreateResizable(
const HashTableImplType hash_table_type,
const std::vector<const Type*> &key_types,
@@ -363,13 +365,16 @@ class AggregationStateHashTableFactory {
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager) {
switch (hash_table_type) {
- case HashTableImplType::kSeparateChaining:
- return new PackedPayloadHashTable(
- key_types, num_entries, handles, storage_manager);
case HashTableImplType::kCollisionFreeVector:
DCHECK_EQ(1u, key_types.size());
return new CollisionFreeVectorTable(
key_types.front(), num_entries, handles, storage_manager);
+ case HashTableImplType::kSeparateChaining:
+ return new PackedPayloadHashTable(
+ key_types, num_entries, handles, storage_manager);
+ case HashTableImplType::kThreadPrivateNumeric:
+ return new ThreadPrivateNumericHashTable(
+ key_types, num_entries, handles, storage_manager);
default: {
LOG(FATAL) << "Unrecognized HashTableImplType in "
<< "AggregationStateHashTableFactory::createResizable()";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 6dbd7f9..7257906 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -75,6 +75,10 @@ class HashTablePool {
handles_(handles),
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ HashTableImplType getHashTableImplType() const {
+ return hash_table_impl_type_;
+ }
+
/**
* @brief Check out a hash table for insertion.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 960d5a7..3e89aab 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -88,6 +88,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
~PackedPayloadHashTable() override;
+ HashTableImplType getImplType() const override {
+ return HashTableImplType::kSeparateChaining;
+ }
+
/**
* @brief Erase all entries in this hash table.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/ThreadPrivateNumericHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateNumericHashTable.cpp b/storage/ThreadPrivateNumericHashTable.cpp
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cc1a86fc/storage/ThreadPrivateNumericHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateNumericHashTable.hpp b/storage/ThreadPrivateNumericHashTable.hpp
new file mode 100644
index 0000000..2991900
--- /dev/null
+++ b/storage/ThreadPrivateNumericHashTable.hpp
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/Macros.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ThreadPrivateNumericHashTable : public AggregationStateHashTableBase {
+ public:
+ ThreadPrivateNumericHashTable(
+ const std::vector<const Type *> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle *> &handles,
+ StorageManager *storage_manager)
+ : key_types_(key_types),
+ handles_(handles),
+ bucket_size_(0),
+ num_buckets_(num_entries),
+ buckets_allocated_(0) {
+ for (const Type *key_type : key_types) {
+ DCHECK(!key_type->isVariableLength());
+
+ const std::size_t key_size = key_type->maximumByteLength();
+ DCHECK(key_size == 1u || key_size == 2u || key_size == 4u || key_size == 8u);
+
+ key_sizes_.emplace_back(key_size);
+ }
+
+ for (const AggregationHandle *handle : handles) {
+ state_offsets_.emplace_back(bucket_size_);
+
+ const std::vector<const Type*> arg_types = handle->getArgumentTypes();
+ DCHECK_LE(arg_types.size(), 1u);
+
+ std::size_t state_size = 0;
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ state_size = sizeof(std::int64_t);
+ break;
+ }
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, arg_types.size());
+ switch (arg_types.front()->getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ state_size = sizeof(std::int64_t);
+ break;
+ case TypeID::kFloat: // Fall through
+ case TypeID::kDouble:
+ state_size = sizeof(double);
+ break;
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ bucket_size_ += state_size;
+ }
+
+ keys_.reset(sizeof(std::uint64_t) * num_buckets_);
+ buckets_.reset(bucket_size_ * num_buckets_);
+ }
+
+ ~ThreadPrivateNumericHashTable() override {}
+
+ HashTableImplType getImplType() const override {
+ return HashTableImplType::kThreadPrivateNumeric;
+ }
+
+ void destroyPayload() override {}
+
+ std::size_t getMemoryConsumptionBytes() const override {
+ return num_buckets_ * (bucket_size_ + sizeof(std::uint64_t));
+ }
+
+ inline std::size_t numEntries() const {
+ return buckets_allocated_;
+ }
+
+ bool upsertValueAccessorCompositeKey(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_attr_ids,
+ const ValueAccessorMultiplexer &accessor_mux) override {
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+
+ DCHECK(base_accessor != nullptr);
+ const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
+
+ ScopedBuffer buffer(sizeof(std::uint64_t) * num_tuples);
+ std::uint64_t *key_codes = static_cast<std::uint64_t*>(buffer.get());
+ std::size_t key_code_offset = 0;
+ for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
+ const auto &key_attr_id = key_attr_ids[i];
+ ValueAccessor *accessor =
+ key_attr_id.source == ValueAccessorSource::kBase
+ ? base_accessor
+ : derived_accessor;
+ DCHECK(accessor != nullptr);
+
+ const std::size_t key_size = key_sizes_[i];
+ switch (key_size) {
+ case 1u:
+ ConstructKeyCode<std::uint8_t>(
+ key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+ break;
+ case 2u:
+ ConstructKeyCode<std::uint16_t>(
+ key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+ break;
+ case 4u:
+ ConstructKeyCode<std::uint32_t>(
+ key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+ break;
+ case 8u:
+ ConstructKeyCode<std::uint64_t>(
+ key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+ break;
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+
+ key_code_offset += key_size;
+ }
+
+ std::vector<BucketIndex> bucket_indices;
+ bucket_indices.reserve(num_tuples);
+ std::uint64_t *keys = static_cast<std::uint64_t*>(keys_.get());
+ for (std::size_t i = 0; i < num_tuples; ++i) {
+ const std::size_t code = key_codes[i];
+ const auto index_it = index_.find(code);
+ if (index_it == index_.end()) {
+ // TODO: Resize if overflow
+ index_.emplace(code, buckets_allocated_);
+ bucket_indices.emplace_back(buckets_allocated_);
+ keys[buckets_allocated_] = code;
+ ++buckets_allocated_;
+ } else {
+ bucket_indices.emplace_back(index_it->second);
+ }
+ }
+
+ // Dispatch
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const AggregationHandle *handle = handles_[i];
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ upsertValueAccessorCount(bucket_indices, state_offsets_[i]);
+ break;
+ }
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, argument_ids[i].size());
+ const auto &argument_id = argument_ids[i].front();
+ ValueAccessor *accessor =
+ argument_id.source == ValueAccessorSource::kBase
+ ? base_accessor
+ : derived_accessor;
+ DCHECK(accessor != nullptr);
+
+ DCHECK_EQ(1u, handle->getArgumentTypes().size());
+ const Type *argument_type = handle->getArgumentTypes().front();
+ switch (argument_type->getTypeID()) {
+ case kInt: {
+ upsertValueAccessorSum<int, std::int64_t>(
+ bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+ break;
+ }
+ case kLong: {
+ upsertValueAccessorSum<std::int64_t, std::int64_t>(
+ bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+ break;
+ }
+ case kFloat: {
+ upsertValueAccessorSum<float, double>(
+ bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+ break;
+ }
+ case kDouble: {
+ upsertValueAccessorSum<double, double>(
+ bucket_indices, state_offsets_[i], argument_id.attr_id, accessor);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ }
+
+ return true;
+ }
+
+ void merge(const ThreadPrivateNumericHashTable *other) {
+ std::vector<BucketIndex> dst_bucket_indices;
+ std::uint64_t *dst_keys = static_cast<std::uint64_t*>(keys_.get());
+
+ const char *src_buckets_start =
+ static_cast<const char*>(other->buckets_.get());
+ const std::uint64_t *src_keys =
+ static_cast<const std::uint64_t*>(other->keys_.get());
+
+ for (std::size_t i = 0; i < other->buckets_allocated_; ++i) {
+ const std::uint64_t code = src_keys[i];
+ const auto index_it = index_.find(code);
+
+ if (index_it == index_.end()) {
+ // TODO: Resize if overflow
+ index_.emplace(code, buckets_allocated_);
+ dst_bucket_indices.emplace_back(buckets_allocated_);
+ dst_keys[buckets_allocated_] = code;
+ ++buckets_allocated_;
+ } else {
+ dst_bucket_indices.emplace_back(index_it->second);
+ }
+ }
+
+ // Dispatch
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const AggregationHandle *handle = handles_[i];
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ mergeStateSum<std::int64_t>(
+ dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+ break;
+ }
+ case AggregationID::kSum: {
+ const Type *argument_type = handle->getArgumentTypes().front();
+ switch (argument_type->getTypeID()) {
+ case kInt: // Fall through
+ case kLong: {
+ mergeStateSum<std::int64_t>(
+ dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+ break;
+ }
+ case kFloat: // Fall through
+ case kDouble: {
+ mergeStateSum<double>(
+ dst_bucket_indices, src_buckets_start, state_offsets_[i]);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ }
+ }
+
+ void print() const {
+ std::cout << "num_entries = " << buckets_allocated_ << "\n";
+ const double *values = static_cast<const double*>(buckets_.get());
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ std::cout << values[i] << "\n";
+ }
+ }
+
+ void finalize(ColumnVectorsValueAccessor *output) const {
+ std::size_t key_offset = 0;
+ for (std::size_t i = 0; i < key_types_.size(); ++i) {
+ const Type &key_type = *key_types_[i];
+ std::unique_ptr<NativeColumnVector> native_cv(
+ new NativeColumnVector(key_type, buckets_allocated_));
+
+ const std::size_t key_size = key_sizes_[i];
+ switch (key_size) {
+ case 1u:
+ finalizeKey<std::uint8_t>(key_offset, native_cv.get());
+ break;
+ case 2u:
+ finalizeKey<std::uint16_t>(key_offset, native_cv.get());
+ break;
+ case 4u:
+ finalizeKey<std::uint32_t>(key_offset, native_cv.get());
+ break;
+ case 8u:
+ finalizeKey<std::uint64_t>(key_offset, native_cv.get());
+ break;
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ output->addColumn(native_cv.release());
+
+ key_offset += key_size;
+ }
+
+ // Dispatch
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const AggregationHandle *handle = handles_[i];
+ const Type &result_type = *handle->getResultType();
+ std::unique_ptr<NativeColumnVector> native_cv(
+ new NativeColumnVector(result_type, buckets_allocated_));
+
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ finalizeStateSum<std::int64_t, std::int64_t>(
+ state_offsets_[i], native_cv.get());
+ break;
+ }
+ case AggregationID::kSum: {
+ const Type *argument_type = handle->getArgumentTypes().front();
+ switch (argument_type->getTypeID()) {
+ case kInt: // Fall through
+ case kLong: {
+ finalizeStateSum<std::int64_t, std::int64_t>(
+ state_offsets_[i], native_cv.get());
+ break;
+ }
+ case kFloat: // Fall through
+ case kDouble: {
+ finalizeStateSum<double, double>(
+ state_offsets_[i], native_cv.get());
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Not implemented";
+ }
+ output->addColumn(native_cv.release());
+ }
+ }
+
+ private:
+ using BucketIndex = std::uint32_t;
+
+ template <typename KeyT>
+ inline static void ConstructKeyCode(const std::size_t offset,
+ const attribute_id attr_id,
+ ValueAccessor *accessor,
+ void *key_code_start) {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
+ accessor->beginIteration();
+ while (accessor->next()) {
+ *reinterpret_cast<KeyT*>(key_code_ptr) =
+ *static_cast<const KeyT*>(
+ accessor->template getUntypedValue<false>(attr_id));
+ key_code_ptr += sizeof(std::uint64_t);
+ }
+ });
+ }
+
+ inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices,
+ const std::size_t state_offset) {
+ char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+ for (const BucketIndex idx : bucket_indices) {
+ char *state_ptr = state_start + bucket_size_ * idx;
+ *reinterpret_cast<std::int64_t*>(state_ptr) += 1;
+ }
+ }
+
+ template <typename ArgumentT, typename StateT>
+ inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices,
+ const std::size_t state_offset,
+ const attribute_id attr_id,
+ ValueAccessor *accessor) {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ accessor->beginIteration();
+
+ char *state_start = static_cast<char*>(buckets_.get()) + state_offset;
+ std::size_t loc = 0;
+ while (accessor->next()) {
+ char *state_ptr = state_start + bucket_size_ * bucket_indices[loc];
+ *reinterpret_cast<StateT*>(state_ptr) +=
+ *static_cast<const ArgumentT*>(
+ accessor->template getUntypedValue<false>(attr_id));
+ ++loc;
+ }
+ });
+ }
+
+ template <typename StateT>
+ inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
+ const void *src_buckets_start,
+ const std::size_t state_offset) {
+ char *dst_state_start = static_cast<char*>(buckets_.get()) + state_offset;
+ const char* src_state_start =
+ static_cast<const char*>(src_buckets_start) + state_offset;
+ for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
+ char *dst_state_ptr = dst_state_start + bucket_size_ * dst_bucket_indices[i];
+ const char *src_state_ptr = src_state_start + bucket_size_ * i;
+ *reinterpret_cast<StateT*>(dst_state_ptr) +=
+ *reinterpret_cast<const StateT*>(src_state_ptr);
+ }
+ }
+
+ template <typename KeyT>
+ inline void finalizeKey(const std::size_t offset,
+ NativeColumnVector *output_cv) const {
+ const char *key_ptr = static_cast<const char*>(keys_.get()) + offset;
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ *static_cast<KeyT*>(output_cv->getPtrForDirectWrite()) =
+ *reinterpret_cast<const KeyT*>(key_ptr);
+ key_ptr += sizeof(std::uint64_t);
+ }
+ }
+
+ template <typename StateT, typename ResultT>
+ inline void finalizeStateSum(const std::size_t state_offset,
+ NativeColumnVector *output_cv) const {
+ char *state_ptr = static_cast<char*>(buckets_.get()) + state_offset;
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) =
+ *reinterpret_cast<const StateT*>(state_ptr);
+ state_ptr += bucket_size_;
+ }
+ }
+
+ const std::vector<const Type*> key_types_;
+ const std::vector<AggregationHandle *> handles_;
+
+ std::vector<std::size_t> key_sizes_;
+ std::vector<std::size_t> state_offsets_;
+ std::size_t bucket_size_;
+
+ std::unordered_map<std::uint64_t, BucketIndex> index_;
+
+ std::size_t num_buckets_;
+ std::size_t buckets_allocated_;
+
+ ScopedBuffer keys_;
+ ScopedBuffer buckets_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPrivateNumericHashTable);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_THREAD_PRIVATE_NUMERIC_HASH_TABLE_HPP_