You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/31 07:06:06 UTC
[61/62] [abbrv] incubator-quickstep git commit: Initial commit.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/expressions/aggregation/AggregationHandleSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.cpp b/expressions/aggregation/AggregationHandleSum.cpp
index 642d88d..00b229e 100644
--- a/expressions/aggregation/AggregationHandleSum.cpp
+++ b/expressions/aggregation/AggregationHandleSum.cpp
@@ -25,8 +25,8 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "storage/HashTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/PackedPayloadAggregationStateHashTable.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
@@ -43,7 +43,8 @@ namespace quickstep {
class StorageManager;
AggregationHandleSum::AggregationHandleSum(const Type &type)
- : argument_type_(type), block_update_(false) {
+ : AggregationConcreteHandle(AggregationID::kSum),
+ argument_type_(type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_precision_id;
@@ -79,47 +80,26 @@ AggregationHandleSum::AggregationHandleSum(const Type &type)
result_type_ = &sum_type.getNullableVersion();
}
-AggregationStateHashTableBase* AggregationHandleSum::createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const {
- return AggregationStateHashTableFactory<AggregationStateSum>::CreateResizable(
- hash_table_impl, group_by_types, estimated_num_groups, storage_manager);
-}
+AggregationState* AggregationHandleSum::accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const {
+ DCHECK_EQ(1u, argument_ids.size())
+ << "Got wrong number of attributes for SUM: " << argument_ids.size();
-AggregationState* AggregationHandleSum::accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors) const {
- DCHECK_EQ(1u, column_vectors.size())
- << "Got wrong number of ColumnVectors for SUM: " << column_vectors.size();
- std::size_t num_tuples = 0;
- TypedValue cv_sum = fast_operator_->accumulateColumnVector(
- blank_state_.sum_, *column_vectors.front(), &num_tuples);
- return new AggregationStateSum(std::move(cv_sum), num_tuples == 0);
-}
+ const attribute_id argument_id = argument_ids.front();
+ DCHECK_NE(argument_id, kInvalidAttributeID);
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-AggregationState* AggregationHandleSum::accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_ids) const {
- DCHECK_EQ(1u, accessor_ids.size())
- << "Got wrong number of attributes for SUM: " << accessor_ids.size();
+ ValueAccessor *target_accessor =
+ argument_id >= 0 ? accessor : aux_accessor;
+ const attribute_id target_argument_id =
+ argument_id >= 0 ? argument_id : -(argument_id+2);
std::size_t num_tuples = 0;
TypedValue va_sum = fast_operator_->accumulateValueAccessor(
- blank_state_.sum_, accessor, accessor_ids.front(), &num_tuples);
+ blank_state_.sum_, target_accessor, target_argument_id, &num_tuples);
return new AggregationStateSum(std::move(va_sum), num_tuples == 0);
}
-#endif
-
-void AggregationHandleSum::aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const {
- DCHECK_EQ(1u, argument_ids.size())
- << "Got wrong number of arguments for SUM: " << argument_ids.size();
-}
void AggregationHandleSum::mergeStates(const AggregationState &source,
AggregationState *destination) const {
@@ -134,8 +114,8 @@ void AggregationHandleSum::mergeStates(const AggregationState &source,
sum_destination->null_ = sum_destination->null_ && sum_source.null_;
}
-void AggregationHandleSum::mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const {
+void AggregationHandleSum::mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const {
const TypedValue *src_sum_ptr =
reinterpret_cast<const TypedValue *>(source + blank_state_.sum_offset_);
const bool *src_null_ptr =
@@ -164,27 +144,10 @@ ColumnVector* AggregationHandleSum::finalizeHashTable(
const AggregationStateHashTableBase &hash_table,
std::vector<std::vector<TypedValue>> *group_by_keys,
int index) const {
- return finalizeHashTableHelperFast<AggregationHandleSum,
- AggregationStateFastHashTable>(
- *result_type_, hash_table, group_by_keys, index);
-}
-
-AggregationState*
-AggregationHandleSum::aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table) const {
- return aggregateOnDistinctifyHashTableForSingleUnaryHelperFast<
- AggregationHandleSum,
- AggregationStateSum>(distinctify_hash_table);
-}
-
-void AggregationHandleSum::aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const {
- aggregateOnDistinctifyHashTableForGroupByUnaryHelperFast<
+ return finalizeHashTableHelper<
AggregationHandleSum,
- AggregationStateFastHashTable>(
- distinctify_hash_table, aggregation_hash_table, index);
+ PackedPayloadSeparateChainingAggregationStateHashTable>(
+ *result_type_, hash_table, group_by_keys, index);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/expressions/aggregation/AggregationHandleSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationHandleSum.hpp b/expressions/aggregation/AggregationHandleSum.hpp
index f0d23e1..9fb7706 100644
--- a/expressions/aggregation/AggregationHandleSum.hpp
+++ b/expressions/aggregation/AggregationHandleSum.hpp
@@ -28,7 +28,6 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/aggregation/AggregationConcreteHandle.hpp"
#include "expressions/aggregation/AggregationHandle.hpp"
-#include "storage/FastHashTable.hpp"
#include "storage/HashTableBase.hpp"
#include "threading/SpinMutex.hpp"
#include "types/Type.hpp"
@@ -41,6 +40,7 @@
namespace quickstep {
class ColumnVector;
+class ColumnVectorsValueAccessor;
class StorageManager;
class ValueAccessor;
@@ -101,16 +101,18 @@ class AggregationHandleSum : public AggregationConcreteHandle {
public:
~AggregationHandleSum() override {}
+ std::vector<const Type *> getArgumentTypes() const override {
+ return {&argument_type_};
+ }
+
+ const Type* getResultType() const override {
+ return result_type_;
+ }
+
AggregationState* createInitialState() const override {
return new AggregationStateSum(blank_state_);
}
- AggregationStateHashTableBase* createGroupByHashTable(
- const HashTableImplType hash_table_impl,
- const std::vector<const Type *> &group_by_types,
- const std::size_t estimated_num_groups,
- StorageManager *storage_manager) const override;
-
inline void iterateUnaryInl(AggregationStateSum *state,
const TypedValue &value) const {
DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
@@ -121,28 +123,19 @@ class AggregationHandleSum : public AggregationConcreteHandle {
state->null_ = false;
}
- inline void iterateUnaryInlFast(const TypedValue &value,
- std::uint8_t *byte_ptr) const {
- DCHECK(value.isPlausibleInstanceOf(argument_type_.getSignature()));
- if (value.isNull()) return;
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
- bool *null_ptr =
- reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
- *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, value);
- *null_ptr = false;
- }
+ AggregationState* accumulate(
+ ValueAccessor *accessor,
+ ColumnVectorsValueAccessor *aux_accessor,
+ const std::vector<attribute_id> &argument_ids) const override;
- inline void updateStateUnary(const TypedValue &argument,
- std::uint8_t *byte_ptr) const override {
- if (!block_update_) {
- iterateUnaryInlFast(argument, byte_ptr);
- }
- }
+ void mergeStates(const AggregationState &source,
+ AggregationState *destination) const override;
- void blockUpdate() override { block_update_ = true; }
+ TypedValue finalize(const AggregationState &state) const override;
- void allowUpdate() override { block_update_ = false; }
+ std::size_t getPayloadSize() const override {
+ return blank_state_.getPayloadSize();
+ }
void initPayload(std::uint8_t *byte_ptr) const override {
TypedValue *sum_ptr =
@@ -161,41 +154,23 @@ class AggregationHandleSum : public AggregationConcreteHandle {
}
}
- AggregationState* accumulateColumnVectors(
- const std::vector<std::unique_ptr<ColumnVector>> &column_vectors)
- const override;
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- AggregationState* accumulateValueAccessor(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &accessor_id) const override;
-#endif
-
- void aggregateValueAccessorIntoHashTable(
- ValueAccessor *accessor,
- const std::vector<attribute_id> &argument_ids,
- const std::vector<attribute_id> &group_by_key_ids,
- AggregationStateHashTableBase *hash_table) const override;
-
- void mergeStates(const AggregationState &source,
- AggregationState *destination) const override;
-
- void mergeStatesFast(const std::uint8_t *source,
- std::uint8_t *destination) const override;
-
- TypedValue finalize(const AggregationState &state) const override;
-
- inline TypedValue finalizeHashTableEntry(
- const AggregationState &state) const {
- return static_cast<const AggregationStateSum &>(state).sum_;
+ inline void updateStateUnary(const TypedValue &argument,
+ std::uint8_t *byte_ptr) const override {
+ DCHECK(argument.isPlausibleInstanceOf(argument_type_.getSignature()));
+ if (argument.isNull()) return;
+ TypedValue *sum_ptr =
+ reinterpret_cast<TypedValue *>(byte_ptr + blank_state_.sum_offset_);
+ bool *null_ptr =
+ reinterpret_cast<bool *>(byte_ptr + blank_state_.null_offset_);
+ *sum_ptr = fast_operator_->applyToTypedValues(*sum_ptr, argument);
+ *null_ptr = false;
}
- inline TypedValue finalizeHashTableEntryFast(
- const std::uint8_t *byte_ptr) const {
- std::uint8_t *value_ptr = const_cast<std::uint8_t *>(byte_ptr);
- TypedValue *sum_ptr =
- reinterpret_cast<TypedValue *>(value_ptr + blank_state_.sum_offset_);
- return *sum_ptr;
+ void mergeStates(const std::uint8_t *source,
+ std::uint8_t *destination) const override;
+
+ inline TypedValue finalizeHashTableEntry(const std::uint8_t *byte_ptr) const {
+ return *reinterpret_cast<const TypedValue *>(byte_ptr + blank_state_.sum_offset_);
}
ColumnVector* finalizeHashTable(
@@ -203,29 +178,6 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::vector<std::vector<TypedValue>> *group_by_keys,
int index) const override;
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForSingle()
- * for SUM aggregation.
- */
- AggregationState* aggregateOnDistinctifyHashTableForSingle(
- const AggregationStateHashTableBase &distinctify_hash_table)
- const override;
-
- /**
- * @brief Implementation of
- * AggregationHandle::aggregateOnDistinctifyHashTableForGroupBy()
- * for SUM aggregation.
- */
- void aggregateOnDistinctifyHashTableForGroupBy(
- const AggregationStateHashTableBase &distinctify_hash_table,
- AggregationStateHashTableBase *aggregation_hash_table,
- std::size_t index) const override;
-
- std::size_t getPayloadSize() const override {
- return blank_state_.getPayloadSize();
- }
-
private:
friend class AggregateFunctionSum;
@@ -242,8 +194,6 @@ class AggregationHandleSum : public AggregationConcreteHandle {
std::unique_ptr<UncheckedBinaryOperator> fast_operator_;
std::unique_ptr<UncheckedBinaryOperator> merge_operator_;
- bool block_update_;
-
DISALLOW_COPY_AND_ASSIGN(AggregationHandleSum);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/expressions/aggregation/AggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/AggregationID.hpp b/expressions/aggregation/AggregationID.hpp
index 1efb35c..cd18d47 100644
--- a/expressions/aggregation/AggregationID.hpp
+++ b/expressions/aggregation/AggregationID.hpp
@@ -32,9 +32,11 @@ namespace quickstep {
enum class AggregationID {
kAvg = 0,
kCount,
+ kDistinct,
kMax,
kMin,
- kSum
+ kSum,
+ kUnknown
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/expressions/aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/aggregation/CMakeLists.txt b/expressions/aggregation/CMakeLists.txt
index e9503f7..bd239d4 100644
--- a/expressions/aggregation/CMakeLists.txt
+++ b/expressions/aggregation/CMakeLists.txt
@@ -146,10 +146,8 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
glog
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
quickstep_threading_SpinMutex
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
@@ -157,6 +155,7 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationConcreteHandl
target_link_libraries(quickstep_expressions_aggregation_AggregationHandle
glog
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
quickstep_types_TypedValue
quickstep_utility_Macros)
@@ -165,10 +164,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleAvg
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
@@ -183,12 +181,12 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleCount
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_storage_ValueAccessor
quickstep_storage_ValueAccessorUtil
+ quickstep_types_LongType
quickstep_types_TypeFactory
quickstep_types_TypeID
quickstep_types_TypedValue
@@ -199,8 +197,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleDistinc
glog
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_types_TypedValue
quickstep_utility_Macros)
target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
@@ -208,10 +207,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMax
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypedValue
@@ -225,10 +223,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleMin
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypedValue
@@ -242,10 +239,9 @@ target_link_libraries(quickstep_expressions_aggregation_AggregationHandleSum
quickstep_catalog_CatalogTypedefs
quickstep_expressions_aggregation_AggregationConcreteHandle
quickstep_expressions_aggregation_AggregationHandle
- quickstep_storage_FastHashTable
- quickstep_storage_HashTable
+ quickstep_expressions_aggregation_AggregationID
quickstep_storage_HashTableBase
- quickstep_storage_HashTableFactory
+ quickstep_storage_PackedPayloadAggregationStateHashTable
quickstep_threading_SpinMutex
quickstep_types_Type
quickstep_types_TypeFactory
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 895c2ea..ed0f99c 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -200,20 +200,6 @@ class QueryContext {
}
/**
- * @brief Destroy the payloads from the aggregation hash tables.
- *
- * @warning After calling these methods, the hash table will be in an invalid
- * state. No other operation should be performed on them.
- *
- * @param id The ID of the AggregationOperationState.
- **/
- inline void destroyAggregationHashTablePayload(const aggregation_state_id id) {
- DCHECK_LT(id, aggregation_states_.size());
- DCHECK(aggregation_states_[id]);
- aggregation_states_[id]->destroyAggregationHashTablePayload();
- }
-
- /**
* @brief Whether the given GeneratorFunctionHandle id is valid.
*
* @param id The GeneratorFunctionHandle id.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index b6c794d..61dea01 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,6 +64,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
+ quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
@@ -123,6 +124,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationStateOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_RelationalOperator
@@ -143,6 +145,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_storage_StorageBlockLayout_proto
quickstep_storage_SubBlockTypeRegistry
quickstep_types_Type
+ quickstep_types_TypeID
quickstep_types_Type_proto
quickstep_types_TypedValue
quickstep_types_TypedValue_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index e25b8ad..960fe67 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,6 +49,7 @@
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
@@ -103,6 +104,7 @@
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
#include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationStateOperator.hpp"
#include "relational_operators/InsertOperator.hpp"
#include "relational_operators/NestedLoopsJoinOperator.hpp"
#include "relational_operators/RelationalOperator.hpp"
@@ -124,6 +126,7 @@
#include "storage/SubBlockTypeRegistry.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
+#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "types/containers/Tuple.pb.h"
@@ -366,6 +369,91 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
}
}
+bool ExecutionGenerator::canUseCollisionFreeAggregation(
+ const P::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *exact_num_groups) const {
+ if (aggregate->grouping_expressions().size() != 1) {
+ return false;
+ }
+
+ E::AttributeReferencePtr group_by_key_attr;
+ const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+ return false;
+ }
+
+ bool min_value_stat_is_exact;
+ bool max_value_stat_is_exact;
+ const TypedValue min_value =
+ cost_model_for_aggregation_->findMinValueStat(
+ aggregate, group_by_key_attr, &min_value_stat_is_exact);
+ const TypedValue max_value =
+ cost_model_for_aggregation_->findMaxValueStat(
+ aggregate, group_by_key_attr, &max_value_stat_is_exact);
+ if (min_value.isNull() || max_value.isNull() ||
+ (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+ return false;
+ }
+
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ switch (group_by_key_attr->getValueType().getTypeID()) {
+ case TypeID::kInt: {
+ min_cpp_value = min_value.getLiteral<int>();
+ max_cpp_value = max_value.getLiteral<int>();
+ break;
+ }
+ case TypeID::kLong: {
+ min_cpp_value = min_value.getLiteral<std::int64_t>();
+ max_cpp_value = max_value.getLiteral<std::int64_t>();
+ break;
+ }
+ default:
+ return false;
+ }
+
+ // TODO
+ if (min_cpp_value < 0 ||
+ max_cpp_value > 1000000000 ||
+ max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+ return false;
+ }
+
+
+ for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_func =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+ switch (agg_func->getAggregate().getAggregationID()) {
+ case AggregationID::kCount: // Fall through
+ case AggregationID::kSum:
+ break;
+ default:
+ return false;
+ }
+
+ const auto &arguments = agg_func->getArguments();
+ if (arguments.size() > 1) {
+ return false;
+ }
+
+ if (arguments.size() == 1) {
+ switch (arguments.front()->getValueType().getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ case TypeID::kFloat:
+ case TypeID::kDouble:
+ break;
+ default:
+ return false;
+ }
+ }
+ }
+
+ *exact_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+ return true;
+}
+
void ExecutionGenerator::convertNamedExpressions(
const std::vector<E::NamedExpressionPtr> &named_expressions,
S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1392,6 +1480,8 @@ void ExecutionGenerator::convertAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ bool use_parallel_initialization = false;
+
std::vector<const Type*> group_by_types;
for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
unique_ptr<const Scalar> execution_group_by_expression;
@@ -1412,9 +1502,34 @@ void ExecutionGenerator::convertAggregate(
}
if (!group_by_types.empty()) {
- // Right now, only SeparateChaining is supported.
- aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::SEPARATE_CHAINING);
+ const std::size_t estimated_num_groups =
+ cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
+
+ std::size_t exact_num_groups;
+ const bool can_use_collision_free_aggregation =
+ canUseCollisionFreeAggregation(physical_plan,
+ estimated_num_groups,
+ &exact_num_groups);
+
+ if (can_use_collision_free_aggregation) {
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+ std::cout << "Use collision free aggregation!\n"
+ << "Size = " << exact_num_groups << "\n";
+
+ aggr_state_proto->set_estimated_num_entries(exact_num_groups);
+ use_parallel_initialization = true;
+ } else {
+ // Otherwise, use SeparateChaining.
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ std::cout << "Use normal aggregation\n"
+ << "Size = " << estimated_num_groups << "\n";
+
+ aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
+ }
+ } else {
+ aggr_state_proto->set_estimated_num_entries(1uL);
}
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1452,10 +1567,6 @@ void ExecutionGenerator::convertAggregate(
aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
}
- const std::size_t estimated_num_groups =
- cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
- aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
-
const QueryPlan::DAGNodeIndex aggregation_operator_index =
execution_plan_->addRelationalOperator(
new AggregationOperator(
@@ -1470,6 +1581,18 @@ void ExecutionGenerator::convertAggregate(
false /* is_pipeline_breaker */);
}
+ if (use_parallel_initialization) {
+ const QueryPlan::DAGNodeIndex initialize_aggregation_state_operator_index =
+ execution_plan_->addRelationalOperator(
+ new InitializeAggregationStateOperator(
+ query_handle_->query_id(),
+ aggr_state_index));
+
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ initialize_aggregation_state_operator_index,
+ true);
+ }
+
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 55197c9..411fd4e 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
+#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
@@ -37,6 +38,7 @@
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
@@ -202,6 +204,10 @@ class ExecutionGenerator {
*/
std::string getNewRelationName();
+ bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *exact_num_groups) const;
+
/**
* @brief Sets up the info of the CatalogRelation represented by TableReference.
* TableReference is not converted to any operator.
@@ -419,7 +425,7 @@ class ExecutionGenerator {
/**
* @brief The cost model to use for estimating aggregation hash table size.
*/
- std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
/**
* @brief The cost model to use for estimating join hash table size.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 90133e7..d76a6b3 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -49,6 +49,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
glog
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogRelationStatistics
+ quickstep_catalog_CatalogTypedefs
quickstep_queryoptimizer_costmodel_CostModel
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_ComparisonExpression
@@ -72,6 +73,8 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
quickstep_queryoptimizer_physical_TableReference
quickstep_queryoptimizer_physical_TopLevelPlan
quickstep_queryoptimizer_physical_WindowAggregate
+ quickstep_types_NullType
+ quickstep_types_TypedValue
quickstep_utility_Macros)
# Module all-in-one library:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 75b1b2b..b9606a2 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -21,11 +21,11 @@
#include <algorithm>
#include <memory>
-#include <unordered_map>
#include <vector>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogRelationStatistics.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ComparisonExpression.hpp"
@@ -48,6 +48,8 @@
#include "query_optimizer/physical/TableGenerator.hpp"
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/TypedValue.hpp"
+#include "types/NullType.hpp"
#include "glog/logging.h"
@@ -383,18 +385,124 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
std::size_t StarSchemaSimpleCostModel::getNumDistinctValues(
const E::ExprId attribute_id,
const P::TableReferencePtr &table_reference) {
- const CatalogRelation &relation = *table_reference->relation();
- const std::vector<E::AttributeReferencePtr> &attributes = table_reference->attribute_list();
- for (std::size_t i = 0; i < attributes.size(); ++i) {
- if (attributes[i]->id() == attribute_id) {
- const CatalogRelationStatistics &stat = relation.getStatistics();
- if (stat.hasNumDistinctValues(i)) {
- return stat.getNumDistinctValues(i);
+ const auto rel_attr_id =
+ findCatalogRelationAttributeId(table_reference, attribute_id);
+ if (rel_attr_id != kInvalidAttributeID) {
+ const CatalogRelationStatistics &stat =
+ table_reference->relation()->getStatistics();
+ if (stat.hasNumDistinctValues(rel_attr_id)) {
+ return stat.getNumDistinctValues(rel_attr_id);
+ }
+ }
+ return estimateCardinalityForTableReference(table_reference);
+}
+
+bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
+ const P::PhysicalPtr &physical_plan,
+ const std::vector<E::AttributeReferencePtr> &attributes) {
+ switch (physical_plan->getPhysicalType()) {
+ case P::PhysicalType::kAggregate: {
+ const P::AggregatePtr &aggregate =
+ std::static_pointer_cast<const P::Aggregate>(physical_plan);
+ return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
+ }
+ case P::PhysicalType::kHashJoin: {
+ const P::HashJoinPtr &hash_join =
+ std::static_pointer_cast<const P::HashJoin>(physical_plan);
+ bool unique_from_left =
+ impliesUniqueAttributes(hash_join->right(), hash_join->right_join_attributes())
+ && impliesUniqueAttributes(hash_join->left(), attributes);
+ bool unique_from_right =
+ impliesUniqueAttributes(hash_join->left(), hash_join->left_join_attributes())
+ && impliesUniqueAttributes(hash_join->right(), attributes);
+ return unique_from_left || unique_from_right;
+ }
+ case P::PhysicalType::kTableReference: {
+ const P::TableReferencePtr &table_reference =
+ std::static_pointer_cast<const P::TableReference>(physical_plan);
+ const CatalogRelationStatistics &stat =
+ table_reference->relation()->getStatistics();
+ if (stat.hasNumTuples()) {
+ const std::size_t num_tuples = stat.getNumTuples();
+ for (const auto &attr : attributes) {
+ const attribute_id rel_attr_id =
+ findCatalogRelationAttributeId(table_reference, attr->id());
+ if (rel_attr_id != kInvalidAttributeID &&
+ stat.hasNumDistinctValues(rel_attr_id) &&
+ stat.getNumDistinctValues(rel_attr_id) == num_tuples) {
+ return true;
+ }
+ }
}
+ return false;
+ }
+ case P::PhysicalType::kSample: // Fall through
+ case P::PhysicalType::kSelection:
+ case P::PhysicalType::kSort: {
+ DCHECK_EQ(physical_plan->getNumChildren(), 1u);
+ return impliesUniqueAttributes(physical_plan->children()[0], attributes);
+ }
+ default:
break;
+ }
+ return false;
+}
+
+TypedValue StarSchemaSimpleCostModel::findCatalogRelationStat(
+ const P::PhysicalPtr &physical_plan,
+ const E::ExprId attr_id,
+ const StatType stat_type,
+ bool *is_exact_stat) {
+ P::TableReferencePtr table_reference;
+ if (P::SomeTableReference::MatchesWithConditionalCast(physical_plan, &table_reference)) {
+ const attribute_id rel_attr_id =
+ findCatalogRelationAttributeId(table_reference, attr_id);
+ if (rel_attr_id != kInvalidAttributeID) {
+ const CatalogRelationStatistics &stat =
+ table_reference->relation()->getStatistics();
+
+ if (is_exact_stat != nullptr) {
+ *is_exact_stat = stat.isExact();
+ }
+
+ switch (stat_type) {
+ case StatType::kMin: {
+ if (stat.hasMinValue(rel_attr_id)) {
+ return stat.getMinValue(rel_attr_id);
+ }
+ break;
+ }
+ case StatType::kMax: {
+ if (stat.hasMaxValue(rel_attr_id)) {
+ return stat.getMaxValue(rel_attr_id);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+ return NullType::InstanceNullable().makeNullValue();
}
}
- return estimateCardinalityForTableReference(table_reference);
+
+ for (const auto &child : physical_plan->children()) {
+ if (E::ContainsExprId(child->getOutputAttributes(), attr_id)) {
+ return findCatalogRelationStat(child, attr_id, stat_type, is_exact_stat);
+ }
+ }
+ return NullType::InstanceNullable().makeNullValue();
+}
+
+attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
+ const physical::TableReferencePtr &table_reference,
+ const expressions::ExprId expr_id) {
+ const auto &attribute_list = table_reference->attribute_list();
+ for (std::size_t i = 0; i < attribute_list.size(); ++i) {
+ if (attribute_list[i]->id() == expr_id) {
+ return i;
+ }
+ }
+ return kInvalidAttributeID;
}
} // namespace cost
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index 6f6aa29..8d3ef7b 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -23,7 +23,9 @@
#include <cstddef>
#include <vector>
+#include "catalog/CatalogTypedefs.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
#include "query_optimizer/physical/Aggregate.hpp"
@@ -36,6 +38,7 @@
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/physical/WindowAggregate.hpp"
+#include "types/TypedValue.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
@@ -105,6 +108,63 @@ class StarSchemaSimpleCostModel : public CostModel {
double estimateSelectivityForFilterPredicate(
const physical::PhysicalPtr &physical_plan);
+ /**
+ * @brief Check whether a set of attributes are unique (i.e. have distinct
+ * values) for a relation.
+ *
+ * @param physical_plan The physical plan that corresponds to a relation.
+ * @param attributes The set of attributes to be checked. Note that each
+ * attribute in this set must be an output attribute of the physical
+ * plan.
+ * @return True if it is guaranteed that the attributes are unique; false
+ * otherwise.
+ */
+ bool impliesUniqueAttributes(
+ const physical::PhysicalPtr &physical_plan,
+ const std::vector<expressions::AttributeReferencePtr> &attributes);
+
+ /**
+ * @brief For a physical plan attribute, find its correponding catalog attribute's
+ * MIN statistic. Returns Null value if there is no corresponding catalog
+ * attribute for the physical plan attribute.
+ *
+ * @param physical_plan The physical plan.
+ * @param attribute The attribute. Must be an output attribute of the given
+ * physical plan.
+ * @param is_exact_stat If this pointer is not null, its pointed content will
+ * be modified by this method to indicate whether the returned statistic
+ * is EXACT for the stored relation (i.e. not outdated or estimated).
+ * @return The MIN statistic for the attribute.
+ */
+ TypedValue findMinValueStat(
+ const physical::PhysicalPtr &physical_plan,
+ const expressions::AttributeReferencePtr &attribute,
+ bool *is_exact_stat = nullptr) {
+ return findCatalogRelationStat(
+ physical_plan, attribute->id(), StatType::kMin, is_exact_stat);
+ }
+
+ /**
+ * @brief For a physical plan attribute, find its correponding catalog attribute's
+ * MAX statistic. Returns Null value if there is no corresponding catalog
+ * attribute for the physical plan attribute.
+ *
+ * @param physical_plan The physical plan.
+ * @param attribute The attribute. Must be an output attribute of the given
+ * physical plan.
+ * @param is_exact_stat If this pointer is not null, its pointed content will
+ * be modified by this method to indicate whether the returned statistic
+ * is EXACT for the stored relation (i.e. not not outdated or estimated).
+ * @return The MAX statistic for the attribute.
+ */
+ TypedValue findMaxValueStat(
+ const physical::PhysicalPtr &physical_plan,
+ const expressions::AttributeReferencePtr &attribute,
+ bool *is_exact_stat = nullptr) {
+ return findCatalogRelationStat(
+ physical_plan, attribute->id(), StatType::kMax, is_exact_stat);
+ }
+
private:
std::size_t estimateCardinalityForAggregate(
const physical::AggregatePtr &physical_plan);
@@ -144,6 +204,25 @@ class StarSchemaSimpleCostModel : public CostModel {
std::size_t getNumDistinctValues(const expressions::ExprId attribute_id,
const physical::TableReferencePtr &table_reference);
+ enum class StatType {
+ kMax = 0,
+ kMin
+ };
+
+ // For a physical plan attribute, find its correponding catalog attribute's
+ // min/max statistics. Returns Null value if there is no corresponding catalog
+ // attribute for the physical plan attribute (e.g. the attribute is the result
+ // of an expression).
+ TypedValue findCatalogRelationStat(
+ const physical::PhysicalPtr &physical_plan,
+ const expressions::ExprId expr_id,
+ const StatType stat_type,
+ bool *is_exact_stat);
+
+ // For a table reference attribute, find its correponding catalog attribute.
+ attribute_id findCatalogRelationAttributeId(
+ const physical::TableReferencePtr &table_reference,
+ const expressions::ExprId expr_id);
DISALLOW_COPY_AND_ASSIGN(StarSchemaSimpleCostModel);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/query_optimizer/expressions/ExpressionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/ExpressionUtil.hpp b/query_optimizer/expressions/ExpressionUtil.hpp
index 422d5ab..6b8666e 100644
--- a/query_optimizer/expressions/ExpressionUtil.hpp
+++ b/query_optimizer/expressions/ExpressionUtil.hpp
@@ -122,12 +122,12 @@ bool ContainsExprId(
* contain the other operand).
* @return True if \p left is a subset of \p right.
*/
-template <class NamedExpressionType>
+template <class LeftNamedExpressionType, class RightNamedExpressionType>
bool SubsetOfExpressions(
- const std::vector<std::shared_ptr<const NamedExpressionType>> &left,
- const std::vector<std::shared_ptr<const NamedExpressionType>> &right) {
+ const std::vector<std::shared_ptr<const LeftNamedExpressionType>> &left,
+ const std::vector<std::shared_ptr<const RightNamedExpressionType>> &right) {
UnorderedNamedExpressionSet supset(right.begin(), right.end());
- for (const std::shared_ptr<const NamedExpressionType> &expr : left) {
+ for (const std::shared_ptr<const LeftNamedExpressionType> &expr : left) {
if (supset.find(expr) == supset.end()) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c1caaa3..da33fd0 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -46,6 +46,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator
FinalizeAggregationOperator.cpp
FinalizeAggregationOperator.hpp)
add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp)
+add_library(quickstep_relationaloperators_InitializeAggregationStateOperator
+ InitializeAggregationStateOperator.cpp
+ InitializeAggregationStateOperator.hpp)
add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp)
add_library(quickstep_relationaloperators_NestedLoopsJoinOperator
NestedLoopsJoinOperator.cpp
@@ -230,6 +233,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
quickstep_utility_lipfilter_LIPFilterAdaptiveProber
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_InitializeAggregationStateOperator
+ glog
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_AggregationOperationState
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
glog
quickstep_catalog_CatalogRelation
@@ -522,6 +536,7 @@ target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationStateOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_RebuildWorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 49be43d..62ca9e7 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
}
void DestroyAggregationStateWorkOrder::execute() {
- // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate
- // from the destroyAggregationState call. The reason is that the aggregation
- // hash tables don't own the AggregationHandle objects. However the hash table
- // class requires the handles for destroying the payload (see the
- // destroyPayload methods in AggregationHandle classes). Therefore, we first
- // destroy the payloads in the hash table and then destroy the hash table.
- query_context_->destroyAggregationHashTablePayload(aggr_state_index_);
query_context_->destroyAggregationState(aggr_state_index_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 0cbf635..b66030b 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
AggregationOperationState *agg_state =
query_context->getAggregationState(aggr_state_index_);
DCHECK(agg_state != nullptr);
- for (int part_id = 0;
- part_id < static_cast<int>(agg_state->getNumPartitions());
- ++part_id) {
+ for (std::size_t partition_id = 0;
+ partition_id < agg_state->getNumPartitions();
+ ++partition_id) {
container->addNormalWorkOrder(
new FinalizeAggregationWorkOrder(
query_id_,
+ partition_id,
agg_state,
- query_context->getInsertDestination(output_destination_index_),
- part_id),
+ query_context->getInsertDestination(output_destination_index_)),
op_index_);
}
}
@@ -80,11 +80,7 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
}
void FinalizeAggregationWorkOrder::execute() {
- if (state_->isAggregatePartitioned()) {
- state_->finalizeAggregatePartitioned(part_id_, output_destination_);
- } else {
- state_->finalizeAggregate(output_destination_);
- }
+ state_->finalizeAggregate(partition_id_, output_destination_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index ae7127a..3c209b1 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @note InsertWorkOrder takes ownership of \c state.
*
* @param query_id The ID of the query to which this operator belongs.
+ * @param partition_id The partition ID for which the Finalize aggregation
+ * work order is issued.
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
- * @param part_id The partition ID for which the Finalize aggregation work
- * order is issued. Ignore if aggregation is not partitioned.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
+ const std::size_t partition_id,
AggregationOperationState *state,
- InsertDestination *output_destination,
- const int part_id = -1)
+ InsertDestination *output_destination)
: WorkOrder(query_id),
+ partition_id_(partition_id),
state_(DCHECK_NOTNULL(state)),
- output_destination_(DCHECK_NOTNULL(output_destination)),
- part_id_(part_id) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)) {}
~FinalizeAggregationWorkOrder() override {}
void execute() override;
private:
+ const std::size_t partition_id_;
AggregationOperationState *state_;
InsertDestination *output_destination_;
- const int part_id_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/relational_operators/InitializeAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.cpp b/relational_operators/InitializeAggregationStateOperator.cpp
new file mode 100644
index 0000000..dfee459
--- /dev/null
+++ b/relational_operators/InitializeAggregationStateOperator.cpp
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/InitializeAggregationStateOperator.hpp"
+
+#include <vector>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool InitializeAggregationStateOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ if (!started_) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_);
+ DCHECK(agg_state != nullptr);
+
+ for (std::size_t part_id = 0;
+ part_id < agg_state->getNumInitializationPartitions();
+ ++part_id) {
+ container->addNormalWorkOrder(
+ new InitializeAggregationStateWorkOrder(query_id_,
+ part_id,
+ agg_state),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return started_;
+}
+
+bool InitializeAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ // TODO
+ LOG(FATAL) << "Not implemented";
+}
+
+void InitializeAggregationStateWorkOrder::execute() {
+ state_->initializeState(partition_id_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/af6cf511/relational_operators/InitializeAggregationStateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationStateOperator.hpp b/relational_operators/InitializeAggregationStateOperator.hpp
new file mode 100644
index 0000000..10403b3
--- /dev/null
+++ b/relational_operators/InitializeAggregationStateOperator.hpp
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+class InitializeAggregationStateOperator : public RelationalOperator {
+ public:
+ InitializeAggregationStateOperator(const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index)
+ : RelationalOperator(query_id),
+ aggr_state_index_(aggr_state_index),
+ started_(false) {}
+
+ ~InitializeAggregationStateOperator() override {}
+
+ std::string getName() const override {
+ return "InitializeAggregationStateOperator";
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+ const QueryContext::aggregation_state_id aggr_state_index_;
+ bool started_;
+
+ DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateOperator);
+};
+
+class InitializeAggregationStateWorkOrder : public WorkOrder {
+ public:
+ InitializeAggregationStateWorkOrder(const std::size_t query_id,
+ const std::size_t partition_id,
+ AggregationOperationState *state)
+ : WorkOrder(query_id),
+ partition_id_(partition_id),
+ state_(DCHECK_NOTNULL(state)) {}
+
+ ~InitializeAggregationStateWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const std::size_t partition_id_;
+
+ AggregationOperationState *state_;
+
+ DISALLOW_COPY_AND_ASSIGN(InitializeAggregationStateWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_STATE_OPERATOR_HPP_