You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/04/25 00:56:15 UTC
[1/2] incubator-quickstep git commit: Add
ThreadPrivateCompactKeyHashTable as a fast path data structure for
aggregation. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/fix-common-subexpression c7333e02f -> 30021acf8 (forced update)
Add ThreadPrivateCompactKeyHashTable as a fast path data structure for aggregation.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d6a01e7c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d6a01e7c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d6a01e7c
Branch: refs/heads/fix-common-subexpression
Commit: d6a01e7c867354ca05545595644a62b03de56b81
Parents: 8169306
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Fri Apr 21 23:23:13 2017 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Mon Apr 24 13:40:17 2017 -0500
----------------------------------------------------------------------
query_optimizer/ExecutionGenerator.cpp | 14 +-
.../cost_model/StarSchemaSimpleCostModel.cpp | 71 +++-
.../cost_model/StarSchemaSimpleCostModel.hpp | 12 +
storage/AggregationOperationState.cpp | 68 ++-
storage/AggregationOperationState.hpp | 6 +-
storage/CMakeLists.txt | 24 ++
storage/CollisionFreeVectorTable.hpp | 4 +
storage/HashTable.proto | 1 +
storage/HashTableBase.hpp | 18 +-
storage/HashTableFactory.hpp | 13 +-
storage/HashTablePool.hpp | 9 +
storage/PackedPayloadHashTable.hpp | 4 +
storage/ThreadPrivateCompactKeyHashTable.cpp | 421 +++++++++++++++++++
storage/ThreadPrivateCompactKeyHashTable.hpp | 230 ++++++++++
14 files changed, 870 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 3e0f647..9625a91 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1580,14 +1580,22 @@ void ExecutionGenerator::convertAggregate(
->canUseCollisionFreeAggregation(physical_plan,
estimated_num_groups,
&max_num_groups)) {
+ // First option: use array-based aggregation if applicable.
aggr_state_proto->set_hash_table_impl_type(
serialization::HashTableImplType::COLLISION_FREE_VECTOR);
aggr_state_proto->set_estimated_num_entries(max_num_groups);
use_parallel_initialization = true;
} else {
- // Otherwise, use SeparateChaining.
- aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::SEPARATE_CHAINING);
+ if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
+ physical_plan, estimated_num_groups)) {
+ // Second option: use thread-private compact-key aggregation if applicable.
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY);
+ } else {
+ // Otherwise, use SeparateChaining.
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ }
aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index b17fac0..e0e3dff 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -71,8 +71,8 @@ namespace optimizer {
namespace cost {
DEFINE_int64(collision_free_vector_table_max_size, 1000000000,
- "The maximum allowed key range (number of entries) for using a "
- "CollisionFreeVectorTable.");
+ "The maximum allowed key range (number of entries) for using a "
+ "CollisionFreeVectorTable.");
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
@@ -700,6 +700,73 @@ bool StarSchemaSimpleCostModel::canUseCollisionFreeAggregation(
return true;
}
+bool StarSchemaSimpleCostModel::canUseTwoPhaseCompactKeyAggregation(
+ const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups) {
+ // Require estimated number of groups to be below the specified threshold.
+ //
+ // TODO(jianqiao): It is good to have the threshold to be the same as
+ // FLAGS_partition_aggregation_num_groups_threshold which is defined in
+ // AggregationOperationState.cpp. However, there seems to be no sound place
+ // to put that flag so that it can be shared by the two cpp files (optimizer
+ // vs backend). So here we hardcode the threshold and leave it to be solved
+ // later.
+ if (estimated_num_groups >= 10000u) {
+ return false;
+ }
+
+ // Require fix-length non-nullable keys that can be packed into a 64-bit QWORD.
+ std::size_t total_key_size = 0;
+ for (const auto &key_expr : aggregate->grouping_expressions()) {
+ const Type &type = key_expr->getValueType();
+ if (type.isVariableLength() || type.isNullable()) {
+ return false;
+ }
+ total_key_size += type.maximumByteLength();
+ }
+
+ if (total_key_size > sizeof(std::uint64_t)) {
+ return false;
+ }
+
+ // Check aggregate arguments.
+ for (const auto &agg_alias : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_expr =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_alias->expression());
+
+ // Not supporting DISTINCT aggregation.
+ if (agg_expr->is_distinct()) {
+ return false;
+ }
+
+ // Currently we do not handle NULL values.
+ const auto &arguments = agg_expr->getArguments();
+ for (const auto &arg : arguments) {
+ if (arg->getValueType().isNullable()) {
+ return false;
+ }
+ }
+
+ // Restricted to COUNT/SUM with INT/LONG/FLOAT/DOUBLE arguments.
+ switch (agg_expr->getAggregate().getAggregationID()) {
+ case AggregationID::kCount:
+ break;
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, arguments.size());
+ if (!QUICKSTEP_EQUALS_ANY_CONSTANT(arguments.front()->getValueType().getTypeID(),
+ kInt, kLong, kFloat, kDouble)) {
+ return false;
+ }
+ break;
+ }
+ default:
+ return false;
+ }
+ }
+
+ return true;
+}
+
} // namespace cost
} // namespace optimizer
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index 0461077..99518cf 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -184,6 +184,18 @@ class StarSchemaSimpleCostModel : public CostModel {
const std::size_t estimated_num_groups,
std::size_t *max_num_groups);
+ /**
+ * @brief Checks whether an aggregate node can be efficiently evaluated with
+ * the two-phase compact key aggregation fast path.
+ *
+ * @param aggregate The physical aggregate node to be checked.
+ * @param estimated_num_groups The estimated number of groups for the aggregate.
+ * @return A bool value indicating whether two-phase compact key aggregation
+ * can be used to evaluate \p aggregate.
+ */
+ bool canUseTwoPhaseCompactKeyAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups);
+
private:
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index e5dc93e..0f4a105 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -48,6 +48,7 @@
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/SubBlocksReference.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
#include "storage/TupleIdSequence.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
@@ -69,10 +70,10 @@ namespace quickstep {
DEFINE_int32(num_aggregation_partitions,
41,
"The number of partitions used for performing the aggregation");
-DEFINE_int32(partition_aggregation_num_groups_threshold,
- 500000,
- "The threshold used for deciding whether the aggregation is done "
- "in a partitioned way or not");
+DEFINE_uint64(partition_aggregation_num_groups_threshold,
+ 100000,
+ "The threshold used for deciding whether the aggregation is done "
+ "in a partitioned way or not");
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
@@ -94,11 +95,16 @@ AggregationOperationState::AggregationOperationState(
!is_distinct_.empty(), std::logical_and<bool>())),
storage_manager_(storage_manager) {
if (!group_by.empty()) {
- if (hash_table_impl_type == HashTableImplType::kCollisionFreeVector) {
- is_aggregate_collision_free_ = true;
- } else {
- is_aggregate_partitioned_ = checkAggregatePartitioned(
- estimated_num_entries, is_distinct_, group_by, aggregate_functions);
+ switch (hash_table_impl_type) {
+ case HashTableImplType::kCollisionFreeVector:
+ is_aggregate_collision_free_ = true;
+ break;
+ case HashTableImplType::kThreadPrivateCompactKey:
+ is_aggregate_partitioned_ = false;
+ break;
+ default:
+ is_aggregate_partitioned_ = checkAggregatePartitioned(
+ estimated_num_entries, is_distinct_, group_by, aggregate_functions);
}
}
@@ -420,9 +426,7 @@ bool AggregationOperationState::checkAggregatePartitioned(
// There are GROUP BYs without DISTINCT. Check if the estimated number of
// groups is large enough to warrant a partitioned aggregation.
- return estimated_num_groups >=
- static_cast<std::size_t>(
- FLAGS_partition_aggregation_num_groups_threshold);
+ return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold;
}
std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -715,7 +719,18 @@ void AggregationOperationState::finalizeHashTable(
finalizeHashTableImplPartitioned(partition_id, output_destination);
} else {
DCHECK_EQ(0u, partition_id);
- finalizeHashTableImplThreadPrivate(output_destination);
+ DCHECK(group_by_hashtable_pool_ != nullptr);
+ switch (group_by_hashtable_pool_->getHashTableImplType()) {
+ case HashTableImplType::kSeparateChaining:
+ finalizeHashTableImplThreadPrivatePackedPayload(output_destination);
+ break;
+ case HashTableImplType::kThreadPrivateCompactKey:
+ finalizeHashTableImplThreadPrivateCompactKey(output_destination);
+ break;
+ default:
+ LOG(FATAL) << "Unexpected hash table type in "
+ << "AggregationOperationState::finalizeHashTable()";
+ }
}
}
@@ -840,7 +855,7 @@ void AggregationOperationState::finalizeHashTableImplPartitioned(
output_destination->bulkInsertTuples(&complete_result);
}
-void AggregationOperationState::finalizeHashTableImplThreadPrivate(
+void AggregationOperationState::finalizeHashTableImplThreadPrivatePackedPayload(
InsertDestination *output_destination) {
// TODO(harshad) - The merge phase may be slower when each hash table contains
// large number of entries. We should find ways in which we can perform a
@@ -948,6 +963,31 @@ void AggregationOperationState::finalizeHashTableImplThreadPrivate(
output_destination->bulkInsertTuples(&complete_result);
}
+void AggregationOperationState::finalizeHashTableImplThreadPrivateCompactKey(
+ InsertDestination *output_destination) {
+ auto *hash_tables = group_by_hashtable_pool_->getAllHashTables();
+ DCHECK(hash_tables != nullptr);
+ if (hash_tables->empty()) {
+ return;
+ }
+
+ // Merge all hash tables into one.
+ std::unique_ptr<ThreadPrivateCompactKeyHashTable> final_hash_table(
+ static_cast<ThreadPrivateCompactKeyHashTable*>(hash_tables->back().release()));
+ for (std::size_t i = 0; i < hash_tables->size() - 1; ++i) {
+ std::unique_ptr<AggregationStateHashTableBase> hash_table(
+ hash_tables->at(i).release());
+ final_hash_table->mergeFrom(
+ static_cast<const ThreadPrivateCompactKeyHashTable&>(*hash_table));
+ }
+
+ ColumnVectorsValueAccessor complete_result;
+ final_hash_table->finalize(&complete_result);
+
+ // Bulk-insert the complete result.
+ output_destination->bulkInsertTuples(&complete_result);
+}
+
std::size_t AggregationOperationState::getMemoryConsumptionBytes() const {
std::size_t memory = getMemoryConsumptionBytesHelper(distinctify_hashtables_);
memory += getMemoryConsumptionBytesHelper(group_by_hashtables_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index e6af494..207c4f0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -256,7 +256,11 @@ class AggregationOperationState {
void finalizeHashTableImplPartitioned(const std::size_t partition_id,
InsertDestination *output_destination);
- void finalizeHashTableImplThreadPrivate(InsertDestination *output_destination);
+ void finalizeHashTableImplThreadPrivatePackedPayload(
+ InsertDestination *output_destination);
+
+ void finalizeHashTableImplThreadPrivateCompactKey(
+ InsertDestination *output_destination);
std::size_t getMemoryConsumptionBytesHelper(
const std::vector<std::unique_ptr<AggregationStateHashTableBase>>
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index c3db584..4296ba0 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -250,6 +250,9 @@ add_library(quickstep_storage_StorageManager StorageManager.cpp StorageManager.h
add_library(quickstep_storage_SubBlockTypeRegistry SubBlockTypeRegistry.cpp SubBlockTypeRegistry.hpp)
add_library(quickstep_storage_SubBlockTypeRegistryMacros ../empty_src.cpp SubBlockTypeRegistryMacros.hpp)
add_library(quickstep_storage_SubBlocksReference ../empty_src.cpp SubBlocksReference.hpp)
+add_library(quickstep_storage_ThreadPrivateCompactKeyHashTable
+ ThreadPrivateCompactKeyHashTable.cpp
+ ThreadPrivateCompactKeyHashTable.hpp)
add_library(quickstep_storage_TupleIdSequence ../empty_src.cpp TupleIdSequence.hpp)
add_library(quickstep_storage_TupleReference ../empty_src.cpp TupleReference.hpp)
add_library(quickstep_storage_TupleStorageSubBlock TupleStorageSubBlock.cpp TupleStorageSubBlock.hpp)
@@ -288,6 +291,7 @@ target_link_libraries(quickstep_storage_AggregationOperationState
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
quickstep_storage_SubBlocksReference
+ quickstep_storage_ThreadPrivateCompactKeyHashTable
quickstep_storage_TupleIdSequence
quickstep_storage_TupleStorageSubBlock
quickstep_storage_ValueAccessor
@@ -724,6 +728,7 @@ target_link_libraries(quickstep_storage_HashTableFactory
quickstep_storage_PackedPayloadHashTable
quickstep_storage_SeparateChainingHashTable
quickstep_storage_SimpleScalarSeparateChainingHashTable
+ quickstep_storage_ThreadPrivateCompactKeyHashTable
quickstep_storage_TupleReference
quickstep_types_Type
quickstep_types_TypeFactory
@@ -1039,6 +1044,24 @@ target_link_libraries(quickstep_storage_SubBlockTypeRegistry
target_link_libraries(quickstep_storage_SubBlocksReference
glog
quickstep_utility_PtrVector)
+target_link_libraries(quickstep_storage_ThreadPrivateCompactKeyHashTable
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationHandle
+ quickstep_expressions_aggregation_AggregationID
+ quickstep_storage_HashTableBase
+ quickstep_storage_StorageBlob
+ quickstep_storage_StorageBlockInfo
+ quickstep_storage_StorageConstants
+ quickstep_storage_StorageManager
+ quickstep_storage_ValueAccessorMultiplexer
+ quickstep_storage_ValueAccessorUtil
+ quickstep_types_Type
+ quickstep_types_TypeID
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_utility_Macros
+ quickstep_utility_ScopedBuffer)
target_link_libraries(quickstep_storage_TupleIdSequence
quickstep_storage_StorageBlockInfo
quickstep_utility_BitVector
@@ -1164,6 +1187,7 @@ target_link_libraries(quickstep_storage
quickstep_storage_SubBlockTypeRegistry
quickstep_storage_SubBlockTypeRegistryMacros
quickstep_storage_SubBlocksReference
+ quickstep_storage_ThreadPrivateCompactKeyHashTable
quickstep_storage_TupleIdSequence
quickstep_storage_TupleReference
quickstep_storage_TupleStorageSubBlock
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/CollisionFreeVectorTable.hpp
----------------------------------------------------------------------
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 490a5cc..221a221 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -70,6 +70,10 @@ class CollisionFreeVectorTable : public AggregationStateHashTableBase {
~CollisionFreeVectorTable() override;
+ HashTableImplType getImplType() const override {
+ return HashTableImplType::kCollisionFreeVector;
+ }
+
void destroyPayload() override;
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTable.proto
----------------------------------------------------------------------
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index 6839ebc..ed383df 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -26,6 +26,7 @@ enum HashTableImplType {
LINEAR_OPEN_ADDRESSING = 1;
SEPARATE_CHAINING = 2;
SIMPLE_SCALAR_SEPARATE_CHAINING = 3;
+ THREAD_PRIVATE_COMPACT_KEY = 4;
}
// NOTE(chasseur): This proto describes the run-time parameters for a resizable
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTableBase.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableBase.hpp b/storage/HashTableBase.hpp
index 8be388a..4d9310c 100644
--- a/storage/HashTableBase.hpp
+++ b/storage/HashTableBase.hpp
@@ -44,7 +44,8 @@ enum class HashTableImplType {
kCollisionFreeVector,
kLinearOpenAddressing,
kSeparateChaining,
- kSimpleScalarSeparateChaining
+ kSimpleScalarSeparateChaining,
+ kThreadPrivateCompactKey
};
/**
@@ -113,8 +114,23 @@ class AggregationStateHashTableBase {
const std::vector<MultiSourceAttributeId> &key_attr_ids,
const ValueAccessorMultiplexer &accessor_mux) = 0;
+ /**
+ * @brief Destroy hash table payloads.
+ */
virtual void destroyPayload() = 0;
+ /**
+ * @brief Get the implementation type of this aggregation hash table.
+ *
+ * @return The implementation type of this aggregation hash table.
+ */
+ virtual HashTableImplType getImplType() const = 0;
+
+ /**
+ * @brief Get the estimated memory consumption of this hash table in bytes.
+ *
+ * @return The estimated memory consumption of this hash table in bytes.
+ */
virtual std::size_t getMemoryConsumptionBytes() const = 0;
protected:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTableFactory.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index 9686429..cb1f16f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -32,6 +32,7 @@
#include "storage/PackedPayloadHashTable.hpp"
#include "storage/SeparateChainingHashTable.hpp"
#include "storage/SimpleScalarSeparateChainingHashTable.hpp"
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
#include "storage/TupleReference.hpp"
#include "types/TypeFactory.hpp"
#include "utility/BloomFilter.hpp"
@@ -123,6 +124,8 @@ inline HashTableImplType HashTableImplTypeFromProto(
return HashTableImplType::kSeparateChaining;
case serialization::HashTableImplType::SIMPLE_SCALAR_SEPARATE_CHAINING:
return HashTableImplType::kSimpleScalarSeparateChaining;
+ case serialization::HashTableImplType::THREAD_PRIVATE_COMPACT_KEY:
+ return HashTableImplType::kThreadPrivateCompactKey;
default: {
LOG(FATAL) << "Unrecognized serialization::HashTableImplType\n";
}
@@ -355,7 +358,6 @@ class AggregationStateHashTableFactory {
* hash table constructor.
* @return A new aggregation state hash table.
**/
-
static AggregationStateHashTableBase* CreateResizable(
const HashTableImplType hash_table_type,
const std::vector<const Type*> &key_types,
@@ -363,13 +365,16 @@ class AggregationStateHashTableFactory {
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager) {
switch (hash_table_type) {
- case HashTableImplType::kSeparateChaining:
- return new PackedPayloadHashTable(
- key_types, num_entries, handles, storage_manager);
case HashTableImplType::kCollisionFreeVector:
DCHECK_EQ(1u, key_types.size());
return new CollisionFreeVectorTable(
key_types.front(), num_entries, handles, storage_manager);
+ case HashTableImplType::kSeparateChaining:
+ return new PackedPayloadHashTable(
+ key_types, num_entries, handles, storage_manager);
+ case HashTableImplType::kThreadPrivateCompactKey:
+ return new ThreadPrivateCompactKeyHashTable(
+ key_types, num_entries, handles, storage_manager);
default: {
LOG(FATAL) << "Unrecognized HashTableImplType in "
<< "AggregationStateHashTableFactory::createResizable()";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/HashTablePool.hpp
----------------------------------------------------------------------
diff --git a/storage/HashTablePool.hpp b/storage/HashTablePool.hpp
index 6dbd7f9..f3abddb 100644
--- a/storage/HashTablePool.hpp
+++ b/storage/HashTablePool.hpp
@@ -76,6 +76,15 @@ class HashTablePool {
storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
/**
+ * @brief Get the type of hash table implementation for this pool.
+ *
+ * @return The type of hash table implementation for this pool.
+ */
+ HashTableImplType getHashTableImplType() const {
+ return hash_table_impl_type_;
+ }
+
+ /**
* @brief Check out a hash table for insertion.
*
* @note This method is relevant for specialized (for aggregation)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/PackedPayloadHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 960d5a7..3e89aab 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -88,6 +88,10 @@ class PackedPayloadHashTable : public AggregationStateHashTableBase {
~PackedPayloadHashTable() override;
+ HashTableImplType getImplType() const override {
+ return HashTableImplType::kSeparateChaining;
+ }
+
/**
* @brief Erase all entries in this hash table.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/ThreadPrivateCompactKeyHashTable.cpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.cpp b/storage/ThreadPrivateCompactKeyHashTable.cpp
new file mode 100644
index 0000000..fb68940
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.cpp
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "storage/ThreadPrivateCompactKeyHashTable.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <type_traits>
+#include <vector>
+
+#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "types/Type.hpp"
+#include "types/TypeID.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/ScopedBuffer.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+namespace {
+
+#define CASE_KEY_SIZE(value) \
+ case value: return functor(std::integral_constant<std::size_t, value>())
+
+template <typename FunctorT>
+auto InvokeOnKeySize(const std::size_t key_size, const FunctorT &functor) {
+ switch (key_size) {
+ CASE_KEY_SIZE(1);
+ CASE_KEY_SIZE(2);
+ CASE_KEY_SIZE(3);
+ CASE_KEY_SIZE(4);
+ CASE_KEY_SIZE(5);
+ CASE_KEY_SIZE(6);
+ CASE_KEY_SIZE(7);
+ CASE_KEY_SIZE(8);
+ default:
+ break;
+ }
+ LOG(FATAL) << "Unexpected key size: " << key_size;
+}
+
+#undef CASE_KEY_SIZE
+
+} // namespace
+
+constexpr std::size_t ThreadPrivateCompactKeyHashTable::kKeyCodeSize;
+
+ThreadPrivateCompactKeyHashTable::ThreadPrivateCompactKeyHashTable(
+ const std::vector<const Type*> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle*> &handles,
+ StorageManager *storage_manager)
+ : key_types_(key_types),
+ handles_(handles),
+ total_state_size_(0),
+ num_buckets_(0),
+ buckets_allocated_(0),
+ storage_manager_(storage_manager) {
+ // Cache key sizes.
+ for (const Type *key_type : key_types) {
+ DCHECK(!key_type->isVariableLength());
+ DCHECK(!key_type->isNullable());
+ key_sizes_.emplace_back(key_type->maximumByteLength());
+ }
+
+ for (const AggregationHandle *handle : handles) {
+ const std::vector<const Type*> arg_types = handle->getArgumentTypes();
+ DCHECK_LE(arg_types.size(), 1u);
+ DCHECK(arg_types.empty() || !arg_types.front()->isNullable());
+
+ // Figure out state size.
+ std::size_t state_size = 0;
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ state_size = sizeof(std::int64_t);
+ break;
+ }
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, arg_types.size());
+ switch (arg_types.front()->getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ state_size = sizeof(std::int64_t);
+ break;
+ case TypeID::kFloat: // Fall through
+ case TypeID::kDouble:
+ state_size = sizeof(double);
+ break;
+ default:
+ LOG(FATAL) << "Unexpected argument type";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected AggregationID";
+ }
+ state_sizes_.emplace_back(state_size);
+ total_state_size_ += state_size;
+ }
+
+ // Calculate required memory size for keys and states.
+ const std::size_t required_memory =
+ num_entries * (kKeyCodeSize + total_state_size_);
+ const std::size_t num_storage_slots =
+ storage_manager_->SlotsNeededForBytes(required_memory);
+
+ // Use storage manager to allocate memory.
+ const block_id blob_id = storage_manager->createBlob(num_storage_slots);
+ blob_ = storage_manager->getBlobMutable(blob_id);
+
+ num_buckets_ = blob_->size() / (kKeyCodeSize + total_state_size_);
+ void *memory = blob_->getMemoryMutable();
+
+ // Calculate the memory locations of state vectors.
+ keys_ = static_cast<KeyCode*>(memory);
+ char *state_memory = static_cast<char*>(memory) + num_buckets_ * kKeyCodeSize;
+ std::memset(state_memory, 0, num_buckets_ * total_state_size_);
+
+ for (std::size_t i = 0; i < state_sizes_.size(); ++i) {
+ state_vecs_.emplace_back(state_memory);
+ state_memory += num_buckets_ * state_sizes_[i];
+ }
+}
+
+ThreadPrivateCompactKeyHashTable::~ThreadPrivateCompactKeyHashTable() {
+ // Release the blob.
+ if (blob_.valid()) {
+ const block_id blob_id = blob_->getID();
+ blob_.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id);
+ }
+}
+
+void ThreadPrivateCompactKeyHashTable::resize() {
+ DCHECK_EQ(buckets_allocated_, num_buckets_);
+
+ const std::size_t resized_memory_size =
+ num_buckets_ * 2 * (kKeyCodeSize + total_state_size_);
+ const std::size_t resized_num_slots =
+ storage_manager_->SlotsNeededForBytes(resized_memory_size);
+
+ const block_id resized_blob_id =
+ storage_manager_->createBlob(resized_num_slots);
+ MutableBlobReference resized_blob =
+ storage_manager_->getBlobMutable(resized_blob_id);
+
+ const std::size_t resized_num_buckets =
+ resized_blob->size() / (kKeyCodeSize + total_state_size_);
+ void *resized_memory = resized_blob->getMemoryMutable();
+
+ KeyCode *resized_keys = static_cast<KeyCode*>(resized_memory);
+ std::memcpy(resized_keys, keys_, buckets_allocated_ * kKeyCodeSize);
+ keys_ = resized_keys;
+
+ char *resized_state_memory =
+ static_cast<char*>(resized_memory) + resized_num_buckets * kKeyCodeSize;
+ for (std::size_t i = 0; i < state_sizes_.size(); ++i) {
+ const std::size_t vec_size = buckets_allocated_ * state_sizes_[i];
+ const std::size_t resized_vec_size = resized_num_buckets * state_sizes_[i];
+
+ std::memcpy(resized_state_memory, state_vecs_[i], vec_size);
+ std::memset(resized_state_memory + vec_size,
+ 0,
+ resized_vec_size - vec_size);
+
+ state_vecs_[i] = resized_state_memory;
+ resized_state_memory += resized_vec_size;
+ }
+
+ std::swap(blob_, resized_blob);
+ num_buckets_ = resized_num_buckets;
+
+ const block_id blob_id_to_delete = resized_blob->getID();
+ resized_blob.release();
+ storage_manager_->deleteBlockOrBlobFile(blob_id_to_delete);
+}
+
+bool ThreadPrivateCompactKeyHashTable::upsertValueAccessorCompositeKey(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_attr_ids,
+ const ValueAccessorMultiplexer &accessor_mux) {
+ ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+ ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+
+ DCHECK(base_accessor != nullptr);
+ const std::size_t num_tuples = base_accessor->getNumTuplesVirtual();
+
+ ScopedBuffer buffer(num_tuples * kKeyCodeSize);
+ KeyCode *key_codes = static_cast<KeyCode*>(buffer.get());
+ std::size_t key_code_offset = 0;
+ for (std::size_t i = 0; i < key_attr_ids.size(); ++i) {
+ const auto &key_attr_id = key_attr_ids[i];
+ ValueAccessor *accessor =
+ key_attr_id.source == ValueAccessorSource::kBase
+ ? base_accessor
+ : derived_accessor;
+ DCHECK(accessor != nullptr);
+
+ // Pack the key component into the 64-bit code (with proper offset).
+ InvokeOnKeySize(
+ key_sizes_[i],
+ [&](auto key_size) -> void { // NOLINT(build/c++11)
+ ConstructKeyCode<decltype(key_size)::value>(
+ key_code_offset, key_attr_id.attr_id, accessor, key_codes);
+ });
+ key_code_offset += key_sizes_[i];
+ }
+
+ std::vector<BucketIndex> bucket_indices(num_tuples);
+ for (std::size_t i = 0; i < num_tuples; ++i) {
+ const std::size_t code = key_codes[i];
+ const auto index_it = index_.find(code);
+ if (index_it == index_.end()) {
+ if (buckets_allocated_ >= num_buckets_) {
+ resize();
+ }
+ index_.emplace(code, buckets_allocated_);
+ bucket_indices[i] = buckets_allocated_;
+ keys_[buckets_allocated_] = code;
+ ++buckets_allocated_;
+ } else {
+ bucket_indices[i] = index_it->second;
+ }
+ }
+
+ // Dispatch on AggregationID and argument type.
+ // TODO(jianqiao): refactor type system and aggregation facilities to eliminate
+ // this type of ad-hoc switch statements.
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const AggregationHandle *handle = handles_[i];
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ upsertValueAccessorCount(bucket_indices, state_vecs_[i]);
+ break;
+ }
+ case AggregationID::kSum: {
+ DCHECK_EQ(1u, argument_ids[i].size());
+ const auto &argument_id = argument_ids[i].front();
+ ValueAccessor *accessor =
+ argument_id.source == ValueAccessorSource::kBase
+ ? base_accessor
+ : derived_accessor;
+ DCHECK(accessor != nullptr);
+
+ DCHECK_EQ(1u, handle->getArgumentTypes().size());
+ const Type *argument_type = handle->getArgumentTypes().front();
+ switch (argument_type->getTypeID()) {
+ case kInt: {
+ upsertValueAccessorSum<int, std::int64_t>(
+ bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+ break;
+ }
+ case kLong: {
+ upsertValueAccessorSum<std::int64_t, std::int64_t>(
+ bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+ break;
+ }
+ case kFloat: {
+ upsertValueAccessorSum<float, double>(
+ bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+ break;
+ }
+ case kDouble: {
+ upsertValueAccessorSum<double, double>(
+ bucket_indices, argument_id.attr_id, accessor, state_vecs_[i]);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected argument type";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected AggregationID";
+ }
+ }
+
+ return true;
+}
+
+void ThreadPrivateCompactKeyHashTable::mergeFrom(
+ const ThreadPrivateCompactKeyHashTable &source) {
+ // First merge keys and generate location mappings. That is, source hash
+ // table's bucket *i* should be merged into destination hash table's bucket
+ // *dst_bucket_indices[i]*.
+ std::vector<BucketIndex> dst_bucket_indices(source.buckets_allocated_);
+ const KeyCode *src_keys = source.keys_;
+ for (std::size_t i = 0; i < source.buckets_allocated_; ++i) {
+ const KeyCode code = src_keys[i];
+ const auto index_it = index_.find(code);
+
+ if (index_it == index_.end()) {
+ if (buckets_allocated_ >= num_buckets_) {
+ resize();
+ }
+ index_.emplace(code, buckets_allocated_);
+ dst_bucket_indices[i] = buckets_allocated_;
+ keys_[buckets_allocated_] = code;
+ ++buckets_allocated_;
+ } else {
+ dst_bucket_indices[i] = index_it->second;
+ }
+ }
+
+ // Then merge states in a column-wise way based on dst_bucket_indices.
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const AggregationHandle *handle = handles_[i];
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ mergeStateSum<std::int64_t>(
+ dst_bucket_indices, source.state_vecs_[i], state_vecs_[i]);
+ break;
+ }
+ case AggregationID::kSum: {
+ const Type *argument_type = handle->getArgumentTypes().front();
+ switch (argument_type->getTypeID()) {
+ case kInt: // Fall through
+ case kLong: {
+ mergeStateSum<std::int64_t>(
+ dst_bucket_indices, source.state_vecs_[i], state_vecs_[i]);
+ break;
+ }
+ case kFloat: // Fall through
+ case kDouble: {
+ mergeStateSum<double>(
+ dst_bucket_indices, source.state_vecs_[i], state_vecs_[i]);
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected argument type";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected AggregationID";
+ }
+ }
+}
+
+void ThreadPrivateCompactKeyHashTable::finalize(
+ ColumnVectorsValueAccessor *output) const {
+ // First finalize keys.
+ std::size_t key_offset = 0;
+ for (std::size_t i = 0; i < key_types_.size(); ++i) {
+ const Type &key_type = *key_types_[i];
+ std::unique_ptr<NativeColumnVector> native_cv(
+ std::make_unique<NativeColumnVector>(key_type, buckets_allocated_));
+
+ InvokeOnKeySize(
+ key_sizes_[i],
+ [&](auto key_size) -> void { // NOLINT(build/c++11)
+ this->finalizeKey<decltype(key_size)::value>(key_offset, native_cv.get());
+ });
+ output->addColumn(native_cv.release());
+ key_offset += key_sizes_[i];
+ }
+
+ // Then finalize states.
+ for (std::size_t i = 0; i < handles_.size(); ++i) {
+ const AggregationHandle *handle = handles_[i];
+ const Type &result_type = *handle->getResultType();
+ std::unique_ptr<NativeColumnVector> native_cv(
+ std::make_unique<NativeColumnVector>(result_type, buckets_allocated_));
+
+ switch (handle->getAggregationID()) {
+ case AggregationID::kCount: {
+ finalizeStateSum<std::int64_t, std::int64_t>(
+ state_vecs_[i], native_cv.get());
+ break;
+ }
+ case AggregationID::kSum: {
+ const Type *argument_type = handle->getArgumentTypes().front();
+ switch (argument_type->getTypeID()) {
+ case kInt: // Fall through
+ case kLong: {
+ finalizeStateSum<std::int64_t, std::int64_t>(
+ state_vecs_[i], native_cv.get());
+ break;
+ }
+ case kFloat: // Fall through
+ case kDouble: {
+ finalizeStateSum<double, double>(
+ state_vecs_[i], native_cv.get());
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected argument type";
+ }
+ break;
+ }
+ default:
+ LOG(FATAL) << "Unexpected AggregationID";
+ }
+ output->addColumn(native_cv.release());
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d6a01e7c/storage/ThreadPrivateCompactKeyHashTable.hpp
----------------------------------------------------------------------
diff --git a/storage/ThreadPrivateCompactKeyHashTable.hpp b/storage/ThreadPrivateCompactKeyHashTable.hpp
new file mode 100644
index 0000000..277e2e5
--- /dev/null
+++ b/storage/ThreadPrivateCompactKeyHashTable.hpp
@@ -0,0 +1,230 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+#define QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <unordered_map>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/HashTableBase.hpp"
+#include "storage/StorageBlob.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregationHandle;
+class StorageManager;
+class Type;
+
+/**
+ * @brief Specialized aggregation hash table that is preferable for two-phase
+ * aggregation with small-cardinality group-by keys. To use this hash
+ * table, it also requires that the group-by keys have fixed-length types
+ * with total byte size no greater than 8 (so that the keys can be packed
+ * into a 64-bit QWORD).
+ */
+class ThreadPrivateCompactKeyHashTable : public AggregationStateHashTableBase {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param key_types A vector of one or more types (>1 indicates a composite
+ * key).
+ * @param num_entries The estimated number of entries this hash table will
+ * hold.
+ * @param handles The aggregation handles.
+ * @param storage_manager The StorageManager to use (a StorageBlob will be
+ * allocated to hold this hash table's contents).
+ **/
+ ThreadPrivateCompactKeyHashTable(
+ const std::vector<const Type*> &key_types,
+ const std::size_t num_entries,
+ const std::vector<AggregationHandle*> &handles,
+ StorageManager *storage_manager);
+
+ ~ThreadPrivateCompactKeyHashTable() override;
+
+ HashTableImplType getImplType() const override {
+ return HashTableImplType::kThreadPrivateCompactKey;
+ }
+
+ void destroyPayload() override {}
+
+ std::size_t getMemoryConsumptionBytes() const override {
+ return blob_->size();
+ }
+
+ /**
+ * @return The number of entries in this HashTable.
+ **/
+ inline std::size_t numEntries() const {
+ return buckets_allocated_;
+ }
+
+ bool upsertValueAccessorCompositeKey(
+ const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
+ const std::vector<MultiSourceAttributeId> &key_attr_ids,
+ const ValueAccessorMultiplexer &accessor_mux) override;
+
+ /**
+ * @brief Merge the states of \p source into this hash table.
+ *
+ * @param source The source hash table from which the states are to be merged
+ * into this hash table.
+ */
+ void mergeFrom(const ThreadPrivateCompactKeyHashTable &source);
+
+ /**
+ * @brief Finalize all the aggregation state vectors and add the result column
+ * vectors into the output ColumnVectorsValueAccessor.
+ *
+ * @param output The ColumnVectorsValueAccessor to add all the result column
+ * vectors into.
+ */
+ void finalize(ColumnVectorsValueAccessor *output) const;
+
+ private:
+ // Compact key as a 64-bit QWORD.
+ using KeyCode = std::uint64_t;
+ static constexpr std::size_t kKeyCodeSize = sizeof(KeyCode);
+
+ using BucketIndex = std::uint32_t;
+
+ inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
+ return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+ }
+
+ // Grow the size of this hash table by a factor of 2.
+ void resize();
+
+ template <std::size_t key_size>
+ inline static void ConstructKeyCode(const std::size_t offset,
+ const attribute_id attr_id,
+ ValueAccessor *accessor,
+ void *key_code_start) {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ char *key_code_ptr = static_cast<char*>(key_code_start) + offset;
+ accessor->beginIteration();
+ while (accessor->next()) {
+ std::memcpy(key_code_ptr,
+ accessor->template getUntypedValue<false>(attr_id),
+ key_size);
+ key_code_ptr += kKeyCodeSize;
+ }
+ });
+ }
+
+ inline void upsertValueAccessorCount(const std::vector<BucketIndex> &bucket_indices,
+ void *state_vec) {
+ std::int64_t *states = static_cast<std::int64_t*>(state_vec);
+ for (const BucketIndex idx : bucket_indices) {
+ states[idx] += 1;
+ }
+ }
+
+ template <typename ArgumentT, typename StateT>
+ inline void upsertValueAccessorSum(const std::vector<BucketIndex> &bucket_indices,
+ const attribute_id attr_id,
+ ValueAccessor *accessor,
+ void *state_vec) {
+ InvokeOnAnyValueAccessor(
+ accessor,
+ [&](auto *accessor) -> void { // NOLINT(build/c++11)
+ accessor->beginIteration();
+
+ StateT *states = static_cast<StateT*>(state_vec);
+ for (const BucketIndex idx : bucket_indices) {
+ accessor->next();
+ states[idx] += *static_cast<const ArgumentT*>(
+ accessor->template getUntypedValue<false>(attr_id));
+ }
+ });
+ }
+
+ template <typename StateT>
+ inline void mergeStateSum(const std::vector<BucketIndex> &dst_bucket_indices,
+ const void *src_state_vec,
+ void *dst_state_vec) {
+ StateT *dst_states = static_cast<StateT*>(dst_state_vec);
+ const StateT* src_states = static_cast<const StateT*>(src_state_vec);
+ for (std::size_t i = 0; i < dst_bucket_indices.size(); ++i) {
+ dst_states[dst_bucket_indices[i]] += src_states[i];
+ }
+ }
+
+ template <std::size_t key_size>
+ inline void finalizeKey(const std::size_t offset,
+ NativeColumnVector *output_cv) const {
+ const char *key_ptr = reinterpret_cast<const char*>(keys_) + offset;
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ std::memcpy(output_cv->getPtrForDirectWrite(),
+ key_ptr,
+ key_size);
+ key_ptr += kKeyCodeSize;
+ }
+ }
+
+ template <typename StateT, typename ResultT>
+ inline void finalizeStateSum(const void *state_vec,
+ NativeColumnVector *output_cv) const {
+ const StateT *states = static_cast<const StateT*>(state_vec);
+ for (std::size_t i = 0; i < buckets_allocated_; ++i) {
+ *static_cast<ResultT*>(output_cv->getPtrForDirectWrite()) = states[i];
+ }
+ }
+
+ const std::vector<const Type*> key_types_;
+ const std::vector<AggregationHandle *> handles_;
+
+ std::vector<std::size_t> key_sizes_;
+ std::vector<std::size_t> state_sizes_;
+ std::size_t total_state_size_;
+
+ std::size_t num_buckets_;
+ std::size_t buckets_allocated_;
+
+ // Maps a compact-key to its bucket location.
+ std::unordered_map<KeyCode, BucketIndex> index_;
+
+ // Compact-key array where keys_[i] holds the compact-key for bucket i.
+ KeyCode *keys_;
+
+ // Use a column-wise layout for aggregation states.
+ std::vector<void*> state_vecs_;
+
+ StorageManager *storage_manager_;
+ MutableBlobReference blob_;
+
+ DISALLOW_COPY_AND_ASSIGN(ThreadPrivateCompactKeyHashTable);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_STORAGE_THREAD_PRIVATE_COMPACT_KEY_HASH_TABLE_HPP_
[2/2] incubator-quickstep git commit: Fix the issues with the common
subexpression feature
Posted by zu...@apache.org.
Fix the issues with the common subexpression feature
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/30021acf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/30021acf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/30021acf
Branch: refs/heads/fix-common-subexpression
Commit: 30021acf8bdf6d33f6e940f3343760384d971830
Parents: d6a01e7
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Mon Apr 24 02:33:39 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Apr 24 17:55:22 2017 -0700
----------------------------------------------------------------------
expressions/predicate/Predicate.hpp | 6 +--
expressions/scalar/Scalar.cpp | 2 +-
expressions/scalar/Scalar.hpp | 5 +-
expressions/scalar/ScalarSharedExpression.cpp | 51 +++++++++-----------
expressions/scalar/ScalarSharedExpression.hpp | 13 +++--
.../tests/ScalarCaseExpression_unittest.cpp | 26 +++++++---
.../expressions/CommonSubexpression.hpp | 4 +-
query_optimizer/expressions/SimpleCase.cpp | 6 +--
query_optimizer/rules/CollapseSelection.cpp | 2 +-
query_optimizer/rules/CollapseSelection.hpp | 2 +
.../rules/ExtractCommonSubexpression.cpp | 2 +-
.../rules/ExtractCommonSubexpression.hpp | 2 +-
.../rules/ReuseAggregateExpressions.cpp | 7 +--
.../rules/ReuseAggregateExpressions.hpp | 2 +
14 files changed, 68 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/expressions/predicate/Predicate.hpp
----------------------------------------------------------------------
diff --git a/expressions/predicate/Predicate.hpp b/expressions/predicate/Predicate.hpp
index 6a2ba6d..df04644 100644
--- a/expressions/predicate/Predicate.hpp
+++ b/expressions/predicate/Predicate.hpp
@@ -65,11 +65,7 @@ class Predicate : public Expression {
**/
static const char *kPredicateTypeNames[kNumPredicateTypes];
- /**
- * @brief Virtual destructor.
- *
- **/
- virtual ~Predicate() {
+ ~Predicate() override {
}
std::string getName() const override {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/expressions/scalar/Scalar.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/Scalar.cpp b/expressions/scalar/Scalar.cpp
index c552d8b..da0fc1b 100644
--- a/expressions/scalar/Scalar.cpp
+++ b/expressions/scalar/Scalar.cpp
@@ -31,8 +31,8 @@ const char *Scalar::kScalarDataSourceNames[] = {
"Attribute",
"UnaryExpression",
"BinaryExpression",
- "SharedExpression",
"SimpleCase"
+ "SharedExpression",
};
const TypedValue& Scalar::getStaticValue() const {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/expressions/scalar/Scalar.hpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/Scalar.hpp b/expressions/scalar/Scalar.hpp
index 472b71c..6e482c2 100644
--- a/expressions/scalar/Scalar.hpp
+++ b/expressions/scalar/Scalar.hpp
@@ -69,10 +69,7 @@ class Scalar : public Expression {
**/
static const char *kScalarDataSourceNames[kNumScalarDataSources];
- /**
- * @brief Virtual destructor.
- **/
- virtual ~Scalar() {
+ ~Scalar() override {
}
std::string getName() const override {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/expressions/scalar/ScalarSharedExpression.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarSharedExpression.cpp b/expressions/scalar/ScalarSharedExpression.cpp
index f97c60b..e301116 100644
--- a/expressions/scalar/ScalarSharedExpression.cpp
+++ b/expressions/scalar/ScalarSharedExpression.cpp
@@ -34,19 +34,12 @@ namespace quickstep {
struct SubBlocksReference;
-ScalarSharedExpression::ScalarSharedExpression(const int share_id,
- Scalar *operand)
- : Scalar(operand->getType()),
- share_id_(share_id),
- operand_(operand) {
-}
-
serialization::Scalar ScalarSharedExpression::getProto() const {
serialization::Scalar proto;
proto.set_data_source(serialization::Scalar::SHARED_EXPRESSION);
proto.SetExtension(serialization::ScalarSharedExpression::share_id, share_id_);
proto.MutableExtension(serialization::ScalarSharedExpression::operand)
- ->CopyFrom(operand_->getProto());
+ ->MergeFrom(operand_->getProto());
return proto;
}
@@ -81,16 +74,16 @@ ColumnVectorPtr ScalarSharedExpression::getAllValues(
ColumnVectorCache *cv_cache) const {
if (cv_cache == nullptr) {
return operand_->getAllValues(accessor, sub_blocks_ref, cv_cache);
+ }
+
+ ColumnVectorPtr result;
+ if (cv_cache->contains(share_id_)) {
+ result = cv_cache->get(share_id_);
} else {
- ColumnVectorPtr result;
- if (cv_cache->contains(share_id_)) {
- result = cv_cache->get(share_id_);
- } else {
- result = operand_->getAllValues(accessor, sub_blocks_ref, cv_cache);
- cv_cache->set(share_id_, result);
- }
- return result;
+ result = operand_->getAllValues(accessor, sub_blocks_ref, cv_cache);
+ cv_cache->set(share_id_, result);
}
+ return result;
}
ColumnVectorPtr ScalarSharedExpression::getAllValuesForJoin(
@@ -107,21 +100,21 @@ ColumnVectorPtr ScalarSharedExpression::getAllValuesForJoin(
right_accessor,
joined_tuple_ids,
cv_cache);
+ }
+
+ ColumnVectorPtr result;
+ if (cv_cache->contains(share_id_)) {
+ result = cv_cache->get(share_id_);
} else {
- ColumnVectorPtr result;
- if (cv_cache->contains(share_id_)) {
- result = cv_cache->get(share_id_);
- } else {
- result = operand_->getAllValuesForJoin(left_relation_id,
- left_accessor,
- right_relation_id,
- right_accessor,
- joined_tuple_ids,
- cv_cache);
- cv_cache->set(share_id_, result);
- }
- return result;
+ result = operand_->getAllValuesForJoin(left_relation_id,
+ left_accessor,
+ right_relation_id,
+ right_accessor,
+ joined_tuple_ids,
+ cv_cache);
+ cv_cache->set(share_id_, result);
}
+ return result;
}
void ScalarSharedExpression::getFieldStringItems(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/expressions/scalar/ScalarSharedExpression.hpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/ScalarSharedExpression.hpp b/expressions/scalar/ScalarSharedExpression.hpp
index d5dddbc..f39c45b 100644
--- a/expressions/scalar/ScalarSharedExpression.hpp
+++ b/expressions/scalar/ScalarSharedExpression.hpp
@@ -53,11 +53,16 @@ class ScalarSharedExpression : public Scalar {
/**
* @brief Constructor.
*
- * @param share_id The unique integer identifier for each equivalence class of
- * common subexpressions.
- * @param operand The underlying scalar subexpression.
+ * @param share_id The unique integer identifier for each equivalence class
+ * of common subexpressions.
+ * @param operand The underlying scalar subexpression, which this
+ * ScalarSharedExpression takes ownership of.
**/
- ScalarSharedExpression(const int share_id, Scalar *operand);
+ ScalarSharedExpression(const int share_id, Scalar *operand)
+ : Scalar(operand->getType()),
+ share_id_(share_id),
+ operand_(operand) {
+ }
/**
* @brief Destructor.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp b/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
index 2de9e84..7182642 100644
--- a/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
+++ b/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
@@ -309,7 +309,9 @@ TEST_F(ScalarCaseExpressionTest, BasicComparisonAndLiteralWithFilteredInputTest)
varchar_type));
ColumnVectorPtr result_cv(
- case_expr.getAllValues(filtered_accessor.get(), nullptr, nullptr));
+ case_expr.getAllValues(filtered_accessor.get(),
+ nullptr /* sub_blocks_ref */,
+ nullptr /* cv_cache */));
ASSERT_FALSE(result_cv->isNative());
const IndirectColumnVector &indirect_result_cv
= static_cast<const IndirectColumnVector&>(*result_cv);
@@ -381,7 +383,9 @@ TEST_F(ScalarCaseExpressionTest, WhenClauseOrderTest) {
varchar_type));
ColumnVectorPtr result_cv(
- case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
+ case_expr.getAllValues(&sample_data_value_accessor_,
+ nullptr /* sub_blocks_ref */,
+ nullptr /* cv_cache */));
ASSERT_FALSE(result_cv->isNative());
const IndirectColumnVector &indirect_result_cv
= static_cast<const IndirectColumnVector&>(*result_cv);
@@ -475,7 +479,9 @@ TEST_F(ScalarCaseExpressionTest, ComplexConjunctionAndCalculatedExpressionTest)
new ScalarAttribute(*sample_relation_->getAttributeById(0))));
ColumnVectorPtr result_cv(
- case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
+ case_expr.getAllValues(&sample_data_value_accessor_,
+ nullptr /* sub_blocks_ref */,
+ nullptr /* cv_cache */));
ASSERT_TRUE(result_cv->isNative());
const NativeColumnVector &native_result_cv
= static_cast<const NativeColumnVector&>(*result_cv);
@@ -598,7 +604,9 @@ TEST_F(ScalarCaseExpressionTest,
new ScalarAttribute(*sample_relation_->getAttributeById(0))));
ColumnVectorPtr result_cv(
- case_expr.getAllValues(filtered_accessor.get(), nullptr, nullptr));
+ case_expr.getAllValues(filtered_accessor.get(),
+ nullptr /* sub_blocks_ref */,
+ nullptr /* cv_cache */));
ASSERT_TRUE(result_cv->isNative());
const NativeColumnVector &native_result_cv
= static_cast<const NativeColumnVector&>(*result_cv);
@@ -708,7 +716,9 @@ TEST_F(ScalarCaseExpressionTest, ComplexDisjunctionAndCalculatedExpressionTest)
new ScalarAttribute(*sample_relation_->getAttributeById(0))));
ColumnVectorPtr result_cv(
- case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
+ case_expr.getAllValues(&sample_data_value_accessor_,
+ nullptr /* sub_blocks_ref */,
+ nullptr /* cv_cache */));
ASSERT_TRUE(result_cv->isNative());
const NativeColumnVector &native_result_cv
= static_cast<const NativeColumnVector&>(*result_cv);
@@ -828,7 +838,9 @@ TEST_F(ScalarCaseExpressionTest,
new ScalarAttribute(*sample_relation_->getAttributeById(0))));
ColumnVectorPtr result_cv(
- case_expr.getAllValues(filtered_accessor.get(), nullptr, nullptr));
+ case_expr.getAllValues(filtered_accessor.get(),
+ nullptr /* sub_blocks_ref */,
+ nullptr /* cv_cache */));
ASSERT_TRUE(result_cv->isNative());
const NativeColumnVector &native_result_cv
= static_cast<const NativeColumnVector&>(*result_cv);
@@ -935,7 +947,7 @@ TEST_F(ScalarCaseExpressionTest, JoinTest) {
1,
&other_accessor,
joined_tuple_ids,
- nullptr));
+ nullptr /* cv_cache */));
ASSERT_TRUE(result_cv->isNative());
const NativeColumnVector &native_result_cv
= static_cast<const NativeColumnVector&>(*result_cv);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/expressions/CommonSubexpression.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CommonSubexpression.hpp b/query_optimizer/expressions/CommonSubexpression.hpp
index ce7589d..2c2d86c 100644
--- a/query_optimizer/expressions/CommonSubexpression.hpp
+++ b/query_optimizer/expressions/CommonSubexpression.hpp
@@ -126,8 +126,8 @@ class CommonSubexpression : public Scalar {
addChild(operand);
}
- ExprId common_subexpression_id_;
- ScalarPtr operand_;
+ const ExprId common_subexpression_id_;
+ const ScalarPtr operand_;
DISALLOW_COPY_AND_ASSIGN(CommonSubexpression);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/expressions/SimpleCase.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/SimpleCase.cpp b/query_optimizer/expressions/SimpleCase.cpp
index ccdd8e5..b127d51 100644
--- a/query_optimizer/expressions/SimpleCase.cpp
+++ b/query_optimizer/expressions/SimpleCase.cpp
@@ -195,10 +195,8 @@ bool SimpleCase::equals(const ScalarPtr &other) const {
return false;
}
}
- if ((else_result_expression_ == nullptr
- || expr->else_result_expression_ == nullptr)
- && else_result_expression_ != expr->else_result_expression_) {
- return false;
+ if (else_result_expression_ == nullptr || expr->else_result_expression_ == nullptr) {
+ return else_result_expression_ == expr->else_result_expression_;
}
if (!else_result_expression_->equals(expr->else_result_expression_)) {
return false;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/rules/CollapseSelection.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CollapseSelection.cpp b/query_optimizer/rules/CollapseSelection.cpp
index e5427b4..f92e1b2 100644
--- a/query_optimizer/rules/CollapseSelection.cpp
+++ b/query_optimizer/rules/CollapseSelection.cpp
@@ -46,7 +46,7 @@ P::PhysicalPtr CollapseSelection::applyToNode(const P::PhysicalPtr &input) {
selection->project_expressions();
PullUpProjectExpressions(child_selection->project_expressions(),
{} /* non_project_expression_lists */,
- {&project_expressions} /* project_expression_lists */);
+ { &project_expressions } /* project_expression_lists */);
return P::Selection::Create(child_selection->input(),
project_expressions,
selection->filter_predicate());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/rules/CollapseSelection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CollapseSelection.hpp b/query_optimizer/rules/CollapseSelection.hpp
index bc5e4a3..25c3492 100644
--- a/query_optimizer/rules/CollapseSelection.hpp
+++ b/query_optimizer/rules/CollapseSelection.hpp
@@ -43,6 +43,8 @@ class CollapseSelection : public BottomUpRule<physical::Physical> {
*/
CollapseSelection() {}
+ ~CollapseSelection() override {}
+
std::string getName() const override {
return "CollapseSelection";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/rules/ExtractCommonSubexpression.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.cpp b/query_optimizer/rules/ExtractCommonSubexpression.cpp
index e3f996c..63b6b17 100644
--- a/query_optimizer/rules/ExtractCommonSubexpression.cpp
+++ b/query_optimizer/rules/ExtractCommonSubexpression.cpp
@@ -258,7 +258,7 @@ E::ExpressionPtr ExtractCommonSubexpression::transformExpression(
bool ExtractCommonSubexpression::visitAndCount(
const E::ExpressionPtr &expression,
ScalarCounter *counter,
- ScalarHashable *hashable) {
+ ScalarHashable *hashable) const {
// This bool flag is for avoiding some unnecessary hash() computation.
bool children_hashable = true;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/rules/ExtractCommonSubexpression.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.hpp b/query_optimizer/rules/ExtractCommonSubexpression.hpp
index 3cdd70e..26b09cc 100644
--- a/query_optimizer/rules/ExtractCommonSubexpression.hpp
+++ b/query_optimizer/rules/ExtractCommonSubexpression.hpp
@@ -111,7 +111,7 @@ class ExtractCommonSubexpression : public Rule<physical::Physical> {
bool visitAndCount(
const expressions::ExpressionPtr &expression,
ScalarCounter *counter,
- ScalarHashable *hashable);
+ ScalarHashable *hashable) const;
// Traverse the expression tree and transform subexpressions (to common
// subexpressions) if applicable.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/rules/ReuseAggregateExpressions.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.cpp b/query_optimizer/rules/ReuseAggregateExpressions.cpp
index 79dede6..a7c62c6 100644
--- a/query_optimizer/rules/ReuseAggregateExpressions.cpp
+++ b/query_optimizer/rules/ReuseAggregateExpressions.cpp
@@ -157,7 +157,7 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
count_star_ref = *it;
for (++it; it != count_star_info.end(); ++it) {
- agg_refs[*it].reset(new AggregateReference(count_star_ref));
+ agg_refs[*it] = std::make_unique<AggregateReference>(count_star_ref);
}
}
@@ -194,7 +194,7 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
sum_it == ref_map.end() ? kInvalidRef : sum_it->second.front();
for (const std::size_t idx : avg_it->second) {
- agg_refs[idx].reset(new AggregateReference(sum_ref, count_ref));
+ agg_refs[idx] = std::make_unique<AggregateReference>(sum_ref, count_ref);
}
is_avg_processed = true;
}
@@ -209,7 +209,7 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
DCHECK(!indices.empty());
const std::size_t ref = indices.front();
for (std::size_t i = 1; i < indices.size(); ++i) {
- agg_refs[indices[i]].reset(new AggregateReference(ref));
+ agg_refs[indices[i]] = std::make_unique<AggregateReference>(ref);
}
}
}
@@ -329,6 +329,7 @@ P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
agg_expr->attribute_name(),
agg_expr->attribute_alias(),
agg_expr->relation_name()));
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/30021acf/query_optimizer/rules/ReuseAggregateExpressions.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.hpp b/query_optimizer/rules/ReuseAggregateExpressions.hpp
index 182e9d9..12f81d6 100644
--- a/query_optimizer/rules/ReuseAggregateExpressions.hpp
+++ b/query_optimizer/rules/ReuseAggregateExpressions.hpp
@@ -98,6 +98,8 @@ class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> {
explicit ReuseAggregateExpressions(OptimizerContext *optimizer_context)
: optimizer_context_(optimizer_context) {}
+ ~ReuseAggregateExpressions() override {}
+
std::string getName() const override {
return "ReuseAggregateExpressions";
}