You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/02/05 03:54:00 UTC
[09/11] incubator-quickstep git commit: - Adds
CollisionFreeVectorTable to support specialized fast path aggregation for
range-bounded single integer group-by key. - Supports copy elision for
aggregation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
index 6e6d188..798ba76 100644
--- a/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleMin_unittest.cpp
@@ -32,8 +32,9 @@
#include "expressions/aggregation/AggregationHandleMin.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "types/CharType.hpp"
#include "types/DatetimeIntervalType.hpp"
#include "types/DatetimeLit.hpp"
@@ -50,10 +51,7 @@
#include "types/VarCharType.hpp"
#include "types/YearMonthIntervalType.hpp"
#include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/ComparisonFactory.hpp"
@@ -222,34 +220,6 @@ class AggregationHandleMinTest : public ::testing::Test {
}
template <typename GenericType>
- void checkAggregationMinGenericColumnVector() {
- const GenericType &type = GenericType::Instance(true);
- initializeHandle(type);
- EXPECT_TRUE(
- aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
- .isNull());
-
- typename GenericType::cpptype min;
- std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- column_vectors.emplace_back(
- createColumnVectorGeneric<GenericType>(type, &min));
-
- std::unique_ptr<AggregationState> cv_state(
- aggregation_handle_min_->accumulateColumnVectors(column_vectors));
-
- // Test the state generated directly by accumulateColumnVectors(), and also
- // test after merging back.
- CheckMinValue<typename GenericType::cpptype>(
- min, *aggregation_handle_min_, *cv_state);
-
- aggregation_handle_min_->mergeStates(*cv_state,
- aggregation_handle_min_state_.get());
- CheckMinValue<typename GenericType::cpptype>(
- min, *aggregation_handle_min_, *aggregation_handle_min_state_);
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- template <typename GenericType>
void checkAggregationMinGenericValueAccessor() {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
@@ -265,7 +235,8 @@ class AggregationHandleMinTest : public ::testing::Test {
std::unique_ptr<AggregationState> va_state(
aggregation_handle_min_->accumulateValueAccessor(
- accessor.get(), std::vector<attribute_id>(1, 0)));
+ {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+ ValueAccessorMultiplexer(accessor.get())));
// Test the state generated directly by accumulateValueAccessor(), and also
// test after merging back.
@@ -277,7 +248,6 @@ class AggregationHandleMinTest : public ::testing::Test {
CheckMinValue<typename GenericType::cpptype>(
min, *aggregation_handle_min_, *aggregation_handle_min_state_);
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
template <typename StringType>
void checkAggregationMinString() {
@@ -382,33 +352,6 @@ class AggregationHandleMinTest : public ::testing::Test {
}
template <typename StringType, typename ColumnVectorType>
- void checkAggregationMinStringColumnVector() {
- const StringType &type = StringType::Instance(10, true);
- initializeHandle(type);
- EXPECT_TRUE(
- aggregation_handle_min_->finalize(*aggregation_handle_min_state_)
- .isNull());
-
- std::string min;
- std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- column_vectors.emplace_back(
- createColumnVectorString<ColumnVectorType>(type, &min));
-
- std::unique_ptr<AggregationState> cv_state(
- aggregation_handle_min_->accumulateColumnVectors(column_vectors));
-
- // Test the state generated directly by accumulateColumnVectors(), and also
- // test after merging back.
- CheckMinString(min, *aggregation_handle_min_, *cv_state);
-
- aggregation_handle_min_->mergeStates(*cv_state,
- aggregation_handle_min_state_.get());
- CheckMinString(
- min, *aggregation_handle_min_, *aggregation_handle_min_state_);
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- template <typename StringType, typename ColumnVectorType>
void checkAggregationMinStringValueAccessor() {
const StringType &type = StringType::Instance(10, true);
initializeHandle(type);
@@ -423,7 +366,8 @@ class AggregationHandleMinTest : public ::testing::Test {
std::unique_ptr<AggregationState> va_state(
aggregation_handle_min_->accumulateValueAccessor(
- accessor.get(), std::vector<attribute_id>(1, 0)));
+ {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+ ValueAccessorMultiplexer(accessor.get())));
// Test the state generated directly by accumulateValueAccessor(), and also
// test after merging back.
@@ -434,7 +378,6 @@ class AggregationHandleMinTest : public ::testing::Test {
CheckMinString(
min, *aggregation_handle_min_, *aggregation_handle_min_state_);
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
std::unique_ptr<AggregationHandle> aggregation_handle_min_;
std::unique_ptr<AggregationState> aggregation_handle_min_state_;
@@ -511,43 +454,6 @@ TEST_F(AggregationHandleMinTest, VarCharTypeTest) {
checkAggregationMinString<VarCharType>();
}
-TEST_F(AggregationHandleMinTest, IntTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<IntType>();
-}
-
-TEST_F(AggregationHandleMinTest, LongTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<LongType>();
-}
-
-TEST_F(AggregationHandleMinTest, FloatTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<FloatType>();
-}
-
-TEST_F(AggregationHandleMinTest, DoubleTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<DoubleType>();
-}
-
-TEST_F(AggregationHandleMinTest, DatetimeTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<DatetimeType>();
-}
-
-TEST_F(AggregationHandleMinTest, DatetimeIntervalTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleMinTest, YearMonthIntervalTypeColumnVectorTest) {
- checkAggregationMinGenericColumnVector<YearMonthIntervalType>();
-}
-
-TEST_F(AggregationHandleMinTest, CharTypeColumnVectorTest) {
- checkAggregationMinStringColumnVector<CharType, NativeColumnVector>();
-}
-
-TEST_F(AggregationHandleMinTest, VarCharTypeColumnVectorTest) {
- checkAggregationMinStringColumnVector<VarCharType, IndirectColumnVector>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
TEST_F(AggregationHandleMinTest, IntTypeValueAccessorTest) {
checkAggregationMinGenericValueAccessor<IntType>();
}
@@ -583,7 +489,6 @@ TEST_F(AggregationHandleMinTest, CharTypeValueAccessorTest) {
TEST_F(AggregationHandleMinTest, VarCharTypeValueAccessorTest) {
checkAggregationMinStringValueAccessor<VarCharType, IndirectColumnVector>();
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
#ifdef QUICKSTEP_DEBUG
TEST_F(AggregationHandleMinDeathTest, WrongTypeTest) {
@@ -685,28 +590,25 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
initializeHandle(int_non_null_type);
storage_manager_.reset(new StorageManager("./test_min_data"));
std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
- AggregationStateFastHashTableFactory::CreateResizable(
+ AggregationStateHashTableFactory::CreateResizable(
HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &int_non_null_type),
10,
- {aggregation_handle_min_.get()->getPayloadSize()},
{aggregation_handle_min_.get()},
storage_manager_.get()));
std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
- AggregationStateFastHashTableFactory::CreateResizable(
+ AggregationStateHashTableFactory::CreateResizable(
HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &int_non_null_type),
10,
- {aggregation_handle_min_.get()->getPayloadSize()},
{aggregation_handle_min_.get()},
storage_manager_.get()));
- AggregationStateFastHashTable *destination_hash_table_derived =
- static_cast<AggregationStateFastHashTable *>(
- destination_hash_table.get());
+ PackedPayloadHashTable *destination_hash_table_derived =
+ static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
- AggregationStateFastHashTable *source_hash_table_derived =
- static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+ PackedPayloadHashTable *source_hash_table_derived =
+ static_cast<PackedPayloadHashTable *>(source_hash_table.get());
AggregationHandleMin *aggregation_handle_min_derived =
static_cast<AggregationHandleMin *>(aggregation_handle_min_.get());
@@ -776,47 +678,47 @@ TEST_F(AggregationHandleMinTest, GroupByTableMergeTest) {
memcpy(buffer + 1,
common_key_source_state.get()->getPayloadAddress(),
aggregation_handle_min_.get()->getPayloadSize());
- source_hash_table_derived->putCompositeKey(common_key, buffer);
+ source_hash_table_derived->upsertCompositeKey(common_key, buffer);
memcpy(buffer + 1,
common_key_destination_state.get()->getPayloadAddress(),
aggregation_handle_min_.get()->getPayloadSize());
- destination_hash_table_derived->putCompositeKey(common_key, buffer);
+ destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
memcpy(buffer + 1,
exclusive_key_source_state.get()->getPayloadAddress(),
aggregation_handle_min_.get()->getPayloadSize());
- source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+ source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
memcpy(buffer + 1,
exclusive_key_destination_state.get()->getPayloadAddress(),
aggregation_handle_min_.get()->getPayloadSize());
- destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
- buffer);
+ destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
+ buffer);
EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
EXPECT_EQ(2u, source_hash_table_derived->numEntries());
- AggregationOperationState::mergeGroupByHashTables(
- source_hash_table.get(), destination_hash_table.get());
+ HashTableMerger merger(destination_hash_table_derived);
+ source_hash_table_derived->forEachCompositeKey(&merger);
EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
CheckMinValue<int>(
common_key_source_min_val.getLiteral<int>(),
- aggregation_handle_min_derived->finalizeHashTableEntryFast(
+ aggregation_handle_min_derived->finalizeHashTableEntry(
destination_hash_table_derived->getSingleCompositeKey(common_key) +
1));
- CheckMinValue<int>(exclusive_key_destination_min_val.getLiteral<int>(),
- aggregation_handle_min_derived->finalizeHashTableEntryFast(
- destination_hash_table_derived->getSingleCompositeKey(
- exclusive_destination_key) +
- 1));
- CheckMinValue<int>(exclusive_key_source_min_val.getLiteral<int>(),
- aggregation_handle_min_derived->finalizeHashTableEntryFast(
- source_hash_table_derived->getSingleCompositeKey(
- exclusive_source_key) +
- 1));
+ CheckMinValue<int>(
+ exclusive_key_destination_min_val.getLiteral<int>(),
+ aggregation_handle_min_derived->finalizeHashTableEntry(
+ destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key) + 1));
+ CheckMinValue<int>(
+ exclusive_key_source_min_val.getLiteral<int>(),
+ aggregation_handle_min_derived->finalizeHashTableEntry(
+ source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key) + 1));
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 1d1c084..31a35a0 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -29,8 +29,9 @@
#include "expressions/aggregation/AggregationHandleSum.hpp"
#include "expressions/aggregation/AggregationID.hpp"
#include "storage/AggregationOperationState.hpp"
-#include "storage/FastHashTableFactory.hpp"
+#include "storage/PackedPayloadHashTable.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
#include "types/CharType.hpp"
#include "types/DatetimeIntervalType.hpp"
#include "types/DoubleType.hpp"
@@ -45,10 +46,7 @@
#include "types/VarCharType.hpp"
#include "types/YearMonthIntervalType.hpp"
#include "types/containers/ColumnVector.hpp"
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
#include "types/containers/ColumnVectorsValueAccessor.hpp"
-#endif
#include "gtest/gtest.h"
@@ -186,36 +184,6 @@ class AggregationHandleSumTest : public ::testing::Test {
}
template <typename GenericType, typename PrecisionType>
- void checkAggregationSumGenericColumnVector() {
- const GenericType &type = GenericType::Instance(true);
-
- initializeHandle(type);
- EXPECT_TRUE(
- aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
- .isNull());
-
- typename PrecisionType::cpptype sum;
- std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- column_vectors.emplace_back(
- createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(
- type, &sum));
-
- std::unique_ptr<AggregationState> cv_state(
- aggregation_handle_sum_->accumulateColumnVectors(column_vectors));
-
- // Test the state generated directly by accumulateColumnVectors(), and also
- // test after merging back.
- CheckSumValue<typename PrecisionType::cpptype>(
- sum, *aggregation_handle_sum_, *cv_state);
-
- aggregation_handle_sum_->mergeStates(*cv_state,
- aggregation_handle_sum_state_.get());
- CheckSumValue<typename PrecisionType::cpptype>(
- sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- template <typename GenericType, typename PrecisionType>
void checkAggregationSumGenericValueAccessor() {
const GenericType &type = GenericType::Instance(true);
@@ -233,7 +201,8 @@ class AggregationHandleSumTest : public ::testing::Test {
std::unique_ptr<AggregationState> va_state(
aggregation_handle_sum_->accumulateValueAccessor(
- accessor.get(), std::vector<attribute_id>(1, 0)));
+ {MultiSourceAttributeId(ValueAccessorSource::kBase, 0)},
+ ValueAccessorMultiplexer(accessor.get())));
// Test the state generated directly by accumulateValueAccessor(), and also
// test after merging back.
@@ -245,7 +214,6 @@ class AggregationHandleSumTest : public ::testing::Test {
CheckSumValue<typename PrecisionType::cpptype>(
sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
std::unique_ptr<AggregationHandle> aggregation_handle_sum_;
std::unique_ptr<AggregationState> aggregation_handle_sum_state_;
@@ -306,33 +274,6 @@ TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeTest) {
checkAggregationSumGeneric<YearMonthIntervalType, YearMonthIntervalType>();
}
-TEST_F(AggregationHandleSumTest, IntTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<IntType, LongType>();
-}
-
-TEST_F(AggregationHandleSumTest, LongTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<LongType, LongType>();
-}
-
-TEST_F(AggregationHandleSumTest, FloatTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<FloatType, DoubleType>();
-}
-
-TEST_F(AggregationHandleSumTest, DoubleTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<DoubleType, DoubleType>();
-}
-
-TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<DatetimeIntervalType,
- DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<YearMonthIntervalType,
- YearMonthIntervalType>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
TEST_F(AggregationHandleSumTest, IntTypeValueAccessorTest) {
checkAggregationSumGenericValueAccessor<IntType, LongType>();
}
@@ -358,7 +299,6 @@ TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) {
checkAggregationSumGenericValueAccessor<YearMonthIntervalType,
YearMonthIntervalType>();
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
#ifdef QUICKSTEP_DEBUG
TEST_F(AggregationHandleSumDeathTest, CharTypeTest) {
@@ -464,28 +404,25 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
initializeHandle(long_non_null_type);
storage_manager_.reset(new StorageManager("./test_sum_data"));
std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
- AggregationStateFastHashTableFactory::CreateResizable(
+ AggregationStateHashTableFactory::CreateResizable(
HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &long_non_null_type),
10,
- {aggregation_handle_sum_.get()->getPayloadSize()},
{aggregation_handle_sum_.get()},
storage_manager_.get()));
std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
- AggregationStateFastHashTableFactory::CreateResizable(
+ AggregationStateHashTableFactory::CreateResizable(
HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &long_non_null_type),
10,
- {aggregation_handle_sum_.get()->getPayloadSize()},
{aggregation_handle_sum_.get()},
storage_manager_.get()));
- AggregationStateFastHashTable *destination_hash_table_derived =
- static_cast<AggregationStateFastHashTable *>(
- destination_hash_table.get());
+ PackedPayloadHashTable *destination_hash_table_derived =
+ static_cast<PackedPayloadHashTable *>(destination_hash_table.get());
- AggregationStateFastHashTable *source_hash_table_derived =
- static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
+ PackedPayloadHashTable *source_hash_table_derived =
+ static_cast<PackedPayloadHashTable *>(source_hash_table.get());
AggregationHandleSum *aggregation_handle_sum_derived =
static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
@@ -563,49 +500,47 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
memcpy(buffer + 1,
common_key_source_state.get()->getPayloadAddress(),
aggregation_handle_sum_.get()->getPayloadSize());
- source_hash_table_derived->putCompositeKey(common_key, buffer);
+ source_hash_table_derived->upsertCompositeKey(common_key, buffer);
memcpy(buffer + 1,
common_key_destination_state.get()->getPayloadAddress(),
aggregation_handle_sum_.get()->getPayloadSize());
- destination_hash_table_derived->putCompositeKey(common_key, buffer);
+ destination_hash_table_derived->upsertCompositeKey(common_key, buffer);
memcpy(buffer + 1,
exclusive_key_source_state.get()->getPayloadAddress(),
aggregation_handle_sum_.get()->getPayloadSize());
- source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+ source_hash_table_derived->upsertCompositeKey(exclusive_source_key, buffer);
memcpy(buffer + 1,
exclusive_key_destination_state.get()->getPayloadAddress(),
aggregation_handle_sum_.get()->getPayloadSize());
- destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
- buffer);
+ destination_hash_table_derived->upsertCompositeKey(exclusive_destination_key,
+ buffer);
EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
EXPECT_EQ(2u, source_hash_table_derived->numEntries());
- AggregationOperationState::mergeGroupByHashTables(
- source_hash_table.get(), destination_hash_table.get());
+ HashTableMerger merger(destination_hash_table_derived);
+ source_hash_table_derived->forEachCompositeKey(&merger);
EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
CheckSumValue<std::int64_t>(
common_key_merged_val.getLiteral<std::int64_t>(),
- aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+ aggregation_handle_sum_derived->finalizeHashTableEntry(
destination_hash_table_derived->getSingleCompositeKey(common_key) +
1));
CheckSumValue<std::int64_t>(
exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
- aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+ aggregation_handle_sum_derived->finalizeHashTableEntry(
destination_hash_table_derived->getSingleCompositeKey(
- exclusive_destination_key) +
- 1));
+ exclusive_destination_key) + 1));
CheckSumValue<std::int64_t>(
exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
- aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+ aggregation_handle_sum_derived->finalizeHashTableEntry(
source_hash_table_derived->getSingleCompositeKey(
- exclusive_source_key) +
- 1));
+ exclusive_source_key) + 1));
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_execution/QueryContext.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 895c2ea..ed0f99c 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -200,20 +200,6 @@ class QueryContext {
}
/**
- * @brief Destroy the payloads from the aggregation hash tables.
- *
- * @warning After calling these methods, the hash table will be in an invalid
- * state. No other operation should be performed on them.
- *
- * @param id The ID of the AggregationOperationState.
- **/
- inline void destroyAggregationHashTablePayload(const aggregation_state_id id) {
- DCHECK_LT(id, aggregation_states_.size());
- DCHECK(aggregation_states_[id]);
- aggregation_states_[id]->destroyAggregationHashTablePayload();
- }
-
- /**
* @brief Whether the given GeneratorFunctionHandle id is valid.
*
* @param id The GeneratorFunctionHandle id.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7f90e11..a755832 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -64,6 +64,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
+ quickstep_expressions_aggregation_AggregationID
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
@@ -125,6 +126,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_RelationalOperator
@@ -145,6 +147,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_storage_StorageBlockLayout_proto
quickstep_storage_SubBlockTypeRegistry
quickstep_types_Type
+ quickstep_types_TypeID
quickstep_types_Type_proto
quickstep_types_TypedValue
quickstep_types_TypedValue_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6918313..67c8739 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -49,6 +49,7 @@
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
@@ -105,6 +106,7 @@
#include "relational_operators/DropTableOperator.hpp"
#include "relational_operators/FinalizeAggregationOperator.hpp"
#include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InitializeAggregationOperator.hpp"
#include "relational_operators/InsertOperator.hpp"
#include "relational_operators/NestedLoopsJoinOperator.hpp"
#include "relational_operators/RelationalOperator.hpp"
@@ -126,6 +128,7 @@
#include "storage/SubBlockTypeRegistry.hpp"
#include "types/Type.hpp"
#include "types/Type.pb.h"
+#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/TypedValue.pb.h"
#include "types/containers/Tuple.pb.h"
@@ -371,6 +374,110 @@ void ExecutionGenerator::dropAllTemporaryRelations() {
}
}
+bool ExecutionGenerator::canUseCollisionFreeAggregation(
+ const P::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *max_num_groups) const {
+#ifdef QUICKSTEP_DISTRIBUTED
+ // Currently we cannot do this fast path with the distributed setting. See
+ // the TODOs at InitializeAggregationOperator::getAllWorkOrderProtos() and
+ // FinalizeAggregationOperator::getAllWorkOrderProtos().
+ return false;
+#endif
+
+ // Supports only single group-by key.
+ if (aggregate->grouping_expressions().size() != 1) {
+ return false;
+ }
+
+ // We need to know the exact min/max stats of the group-by key.
+ // So it must be a CatalogAttribute (but not an expression).
+ E::AttributeReferencePtr group_by_key_attr;
+ const E::ExpressionPtr agg_expr = aggregate->grouping_expressions().front();
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(agg_expr, &group_by_key_attr)) {
+ return false;
+ }
+
+ bool min_value_stat_is_exact;
+ bool max_value_stat_is_exact;
+ const TypedValue min_value =
+ cost_model_for_aggregation_->findMinValueStat(
+ aggregate, group_by_key_attr, &min_value_stat_is_exact);
+ const TypedValue max_value =
+ cost_model_for_aggregation_->findMaxValueStat(
+ aggregate, group_by_key_attr, &max_value_stat_is_exact);
+ if (min_value.isNull() || max_value.isNull() ||
+ (!min_value_stat_is_exact) || (!max_value_stat_is_exact)) {
+ return false;
+ }
+
+ std::int64_t min_cpp_value;
+ std::int64_t max_cpp_value;
+ switch (group_by_key_attr->getValueType().getTypeID()) {
+ case TypeID::kInt: {
+ min_cpp_value = min_value.getLiteral<int>();
+ max_cpp_value = max_value.getLiteral<int>();
+ break;
+ }
+ case TypeID::kLong: {
+ min_cpp_value = min_value.getLiteral<std::int64_t>();
+ max_cpp_value = max_value.getLiteral<std::int64_t>();
+ break;
+ }
+ default:
+ return false;
+ }
+
+ // TODO(jianqiao):
+ // 1. Handle the case where min_cpp_value is below 0 or far greater than 0.
+ // 2. Reason about the upbound (e.g. by checking memory size) instead of
+ // hardcoding it here.
+ const std::int64_t kGroupSizeUpbound = 1000000000;
+ if (min_cpp_value < 0 ||
+ max_cpp_value > kGroupSizeUpbound ||
+ max_cpp_value / static_cast<double>(estimated_num_groups) > 256.0) {
+ return false;
+ }
+
+ for (const auto &agg_expr : aggregate->aggregate_expressions()) {
+ const E::AggregateFunctionPtr agg_func =
+ std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+ if (agg_func->is_distinct()) {
+ return false;
+ }
+
+ // TODO(jianqiao): Support AggregationID::AVG.
+ switch (agg_func->getAggregate().getAggregationID()) {
+ case AggregationID::kCount: // Fall through
+ case AggregationID::kSum:
+ break;
+ default:
+ return false;
+ }
+
+ const auto &arguments = agg_func->getArguments();
+ if (arguments.size() > 1) {
+ return false;
+ }
+
+ if (arguments.size() == 1) {
+ switch (arguments.front()->getValueType().getTypeID()) {
+ case TypeID::kInt: // Fall through
+ case TypeID::kLong:
+ case TypeID::kFloat:
+ case TypeID::kDouble:
+ break;
+ default:
+ return false;
+ }
+ }
+ }
+
+ *max_num_groups = static_cast<std::size_t>(max_cpp_value) + 1;
+ return true;
+}
+
void ExecutionGenerator::convertNamedExpressions(
const std::vector<E::NamedExpressionPtr> &named_expressions,
S::QueryContext::ScalarGroup *scalar_group_proto) {
@@ -1475,6 +1582,8 @@ void ExecutionGenerator::convertAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ bool use_parallel_initialization = false;
+
std::vector<const Type*> group_by_types;
for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
unique_ptr<const Scalar> execution_group_by_expression;
@@ -1495,9 +1604,28 @@ void ExecutionGenerator::convertAggregate(
}
if (!group_by_types.empty()) {
- // Right now, only SeparateChaining is supported.
- aggr_state_proto->set_hash_table_impl_type(
- serialization::HashTableImplType::SEPARATE_CHAINING);
+ const std::size_t estimated_num_groups =
+ cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
+
+ std::size_t max_num_groups;
+ const bool can_use_collision_free_aggregation =
+ canUseCollisionFreeAggregation(physical_plan,
+ estimated_num_groups,
+ &max_num_groups);
+
+ if (can_use_collision_free_aggregation) {
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::COLLISION_FREE_VECTOR);
+ aggr_state_proto->set_estimated_num_entries(max_num_groups);
+ use_parallel_initialization = true;
+ } else {
+ // Otherwise, use SeparateChaining.
+ aggr_state_proto->set_hash_table_impl_type(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
+ }
+ } else {
+ aggr_state_proto->set_estimated_num_entries(1uL);
}
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1535,10 +1663,6 @@ void ExecutionGenerator::convertAggregate(
aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
}
- const std::size_t estimated_num_groups =
- cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
- aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
-
const QueryPlan::DAGNodeIndex aggregation_operator_index =
execution_plan_->addRelationalOperator(
new AggregationOperator(
@@ -1553,6 +1677,18 @@ void ExecutionGenerator::convertAggregate(
false /* is_pipeline_breaker */);
}
+ if (use_parallel_initialization) {
+ const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
+ execution_plan_->addRelationalOperator(
+ new InitializeAggregationOperator(
+ query_handle_->query_id(),
+ aggr_state_index));
+
+ execution_plan_->addDirectDependency(aggregation_operator_index,
+ initialize_aggregation_operator_index,
+ true /* is_pipeline_breaker */);
+ }
+
// Create InsertDestination proto.
const CatalogRelation *output_relation = nullptr;
const QueryContext::insert_destination_id insert_destination_index =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index eba6eee..987f11a 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
#define QUICKSTEP_QUERY_OPTIMIZER_EXECUTION_GENERATOR_HPP_
+#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
@@ -37,6 +38,7 @@
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
@@ -204,6 +206,22 @@ class ExecutionGenerator {
std::string getNewRelationName();
/**
+ * @brief Checks whether an aggregate node can be efficiently evaluated with
+ * the collision-free aggregation fast path.
+ *
+ * @param aggregate The physical aggregate node to be checked.
+ * @param estimated_num_groups The estimated number of groups for the aggregate.
+ * @param exact_num_groups If collision-free aggregation is applicable, the
+ * pointed content of this pointer will be set as the maximum possible
+ * number of groups that the collision-free hash table need to hold.
+ * @return A bool value indicating whether collision-free aggregation can be
+ * used to evaluate \p aggregate.
+ */
+ bool canUseCollisionFreeAggregation(const physical::AggregatePtr &aggregate,
+ const std::size_t estimated_num_groups,
+ std::size_t *max_num_groups) const;
+
+ /**
* @brief Sets up the info of the CatalogRelation represented by TableReference.
* TableReference is not converted to any operator.
*
@@ -427,7 +445,7 @@ class ExecutionGenerator {
/**
* @brief The cost model to use for estimating aggregation hash table size.
*/
- std::unique_ptr<cost::CostModel> cost_model_for_aggregation_;
+ std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_for_aggregation_;
/**
* @brief The cost model to use for estimating join hash table size.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c18dc77..df4114d 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -47,6 +47,9 @@ add_library(quickstep_relationaloperators_FinalizeAggregationOperator
FinalizeAggregationOperator.cpp
FinalizeAggregationOperator.hpp)
add_library(quickstep_relationaloperators_HashJoinOperator HashJoinOperator.cpp HashJoinOperator.hpp)
+add_library(quickstep_relationaloperators_InitializeAggregationOperator
+ InitializeAggregationOperator.cpp
+ InitializeAggregationOperator.hpp)
add_library(quickstep_relationaloperators_InsertOperator InsertOperator.cpp InsertOperator.hpp)
add_library(quickstep_relationaloperators_NestedLoopsJoinOperator
NestedLoopsJoinOperator.cpp
@@ -254,6 +257,17 @@ target_link_libraries(quickstep_relationaloperators_HashJoinOperator
quickstep_utility_lipfilter_LIPFilterAdaptiveProber
quickstep_utility_lipfilter_LIPFilterUtil
tmb)
+target_link_libraries(quickstep_relationaloperators_InitializeAggregationOperator
+ glog
+ quickstep_queryexecution_QueryContext
+ quickstep_queryexecution_WorkOrderProtosContainer
+ quickstep_queryexecution_WorkOrdersContainer
+ quickstep_relationaloperators_RelationalOperator
+ quickstep_relationaloperators_WorkOrder
+ quickstep_relationaloperators_WorkOrder_proto
+ quickstep_storage_AggregationOperationState
+ quickstep_utility_Macros
+ tmb)
target_link_libraries(quickstep_relationaloperators_InsertOperator
glog
quickstep_catalog_CatalogRelation
@@ -548,6 +562,7 @@ target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_DropTableOperator
quickstep_relationaloperators_FinalizeAggregationOperator
quickstep_relationaloperators_HashJoinOperator
+ quickstep_relationaloperators_InitializeAggregationOperator
quickstep_relationaloperators_InsertOperator
quickstep_relationaloperators_NestedLoopsJoinOperator
quickstep_relationaloperators_RebuildWorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/DestroyAggregationStateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 49be43d..62ca9e7 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -58,13 +58,6 @@ bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosConta
}
void DestroyAggregationStateWorkOrder::execute() {
- // NOTE(harshad) : The destroyAggregationHashTablePayload call is separate
- // from the destroyAggregationState call. The reason is that the aggregation
- // hash tables don't own the AggregationHandle objects. However the hash table
- // class requires the handles for destroying the payload (see the
- // destroyPayload methods in AggregationHandle classes). Therefore, we first
- // destroy the payloads in the hash table and then destroy the hash table.
- query_context_->destroyAggregationHashTablePayload(aggr_state_index_);
query_context_->destroyAggregationState(aggr_state_index_);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/FinalizeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 0cbf635..77b4b97 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -44,15 +44,15 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
AggregationOperationState *agg_state =
query_context->getAggregationState(aggr_state_index_);
DCHECK(agg_state != nullptr);
- for (int part_id = 0;
- part_id < static_cast<int>(agg_state->getNumPartitions());
+ for (std::size_t part_id = 0;
+ part_id < agg_state->getNumFinalizationPartitions();
++part_id) {
container->addNormalWorkOrder(
new FinalizeAggregationWorkOrder(
query_id_,
+ part_id,
agg_state,
- query_context->getInsertDestination(output_destination_index_),
- part_id),
+ query_context->getInsertDestination(output_destination_index_)),
op_index_);
}
}
@@ -61,7 +61,7 @@ bool FinalizeAggregationOperator::getAllWorkOrders(
// TODO(quickstep-team) : Think about how the number of partitions could be
// accessed in this function. Until then, we can't use partitioned aggregation
-// with the distributed version.
+// finalization with the distributed version.
bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (blocking_dependencies_met_ && !started_) {
started_ = true;
@@ -80,11 +80,7 @@ bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer
}
void FinalizeAggregationWorkOrder::execute() {
- if (state_->isAggregatePartitioned()) {
- state_->finalizeAggregatePartitioned(part_id_, output_destination_);
- } else {
- state_->finalizeAggregate(output_destination_);
- }
+ state_->finalizeAggregate(partition_id_, output_destination_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/FinalizeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index ae7127a..3c209b1 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -116,29 +116,29 @@ class FinalizeAggregationWorkOrder : public WorkOrder {
* @note InsertWorkOrder takes ownership of \c state.
*
* @param query_id The ID of the query to which this operator belongs.
+ * @param partition_id The partition ID for which the Finalize aggregation
+ * work order is issued.
* @param state The AggregationState to use.
* @param output_destination The InsertDestination to insert aggregation
* results.
- * @param part_id The partition ID for which the Finalize aggregation work
- * order is issued. Ignore if aggregation is not partitioned.
*/
FinalizeAggregationWorkOrder(const std::size_t query_id,
+ const std::size_t partition_id,
AggregationOperationState *state,
- InsertDestination *output_destination,
- const int part_id = -1)
+ InsertDestination *output_destination)
: WorkOrder(query_id),
+ partition_id_(partition_id),
state_(DCHECK_NOTNULL(state)),
- output_destination_(DCHECK_NOTNULL(output_destination)),
- part_id_(part_id) {}
+ output_destination_(DCHECK_NOTNULL(output_destination)) {}
~FinalizeAggregationWorkOrder() override {}
void execute() override;
private:
+ const std::size_t partition_id_;
AggregationOperationState *state_;
InsertDestination *output_destination_;
- const int part_id_;
DISALLOW_COPY_AND_ASSIGN(FinalizeAggregationWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/InitializeAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
new file mode 100644
index 0000000..b1063ad
--- /dev/null
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/InitializeAggregationOperator.hpp"
+
+#include <cstddef>
+
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/AggregationOperationState.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool InitializeAggregationOperator::getAllWorkOrders(
+ WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) {
+ if (!started_) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_);
+ DCHECK(agg_state != nullptr);
+
+ for (std::size_t part_id = 0;
+ part_id < agg_state->getNumInitializationPartitions();
+ ++part_id) {
+ container->addNormalWorkOrder(
+ new InitializeAggregationWorkOrder(query_id_,
+ part_id,
+ agg_state),
+ op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+}
+
+// TODO(quickstep-team) : Think about how the number of partitions could be
+// accessed in this function. Until then, we can't use partitioned aggregation
+// initialization with the distributed version.
+bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ LOG(FATAL) << "Not supported";
+}
+
+void InitializeAggregationWorkOrder::execute() {
+ state_->initialize(partition_id_);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/InitializeAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
new file mode 100644
index 0000000..58d848b
--- /dev/null
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
+
+#include <string>
+
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class AggregationOperationState;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ * @{
+ */
+
+/**
+ * @brief An operator which initializes an AggregationOperationState.
+ **/
+class InitializeAggregationOperator : public RelationalOperator {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of this query.
+ * @param aggr_state_index The index of the AggregationOperationState in QueryContext.
+ **/
+ InitializeAggregationOperator(const std::size_t query_id,
+ const QueryContext::aggregation_state_id aggr_state_index)
+ : RelationalOperator(query_id),
+ aggr_state_index_(aggr_state_index),
+ started_(false) {}
+
+ ~InitializeAggregationOperator() override {}
+
+ std::string getName() const override {
+ return "InitializeAggregationOperator";
+ }
+
+ bool getAllWorkOrders(WorkOrdersContainer *container,
+ QueryContext *query_context,
+ StorageManager *storage_manager,
+ const tmb::client_id scheduler_client_id,
+ tmb::MessageBus *bus) override;
+
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+ private:
+ const QueryContext::aggregation_state_id aggr_state_index_;
+ bool started_;
+
+ DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by InitializeAggregationOperator.
+ **/
+class InitializeAggregationWorkOrder : public WorkOrder {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param query_id The ID of the query to which this operator belongs.
+ * @param partition_id The partition ID for which the work order is issued.
+ * @param state The AggregationOperationState to be initialized.
+ */
+ InitializeAggregationWorkOrder(const std::size_t query_id,
+ const std::size_t partition_id,
+ AggregationOperationState *state)
+ : WorkOrder(query_id),
+ partition_id_(partition_id),
+ state_(DCHECK_NOTNULL(state)) {}
+
+ ~InitializeAggregationWorkOrder() override {}
+
+ void execute() override;
+
+ private:
+ const std::size_t partition_id_;
+
+ AggregationOperationState *state_;
+
+ DISALLOW_COPY_AND_ASSIGN(InitializeAggregationWorkOrder);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_RELATIONAL_OPERATORS_INITIALIZE_AGGREGATION_OPERATOR_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/68be4a61/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 5e8d03d..306bd1a 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -186,6 +186,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
LOG(INFO) << "Creating FinalizeAggregationWorkOrder in Shiftboss " << shiftboss_index;
return new FinalizeAggregationWorkOrder(
proto.query_id(),
+ 0uL,
query_context->getAggregationState(proto.GetExtension(
serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
query_context->getInsertDestination(