You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/09/16 18:29:38 UTC
[14/29] incubator-quickstep git commit: Modified Aggregation unit
test. Ran clang-format.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a9efc4e/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
index 0e35151..1d1c084 100644
--- a/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
+++ b/expressions/aggregation/tests/AggregationHandleSum_unittest.cpp
@@ -28,6 +28,8 @@
#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/aggregation/AggregationHandleSum.hpp"
#include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationOperationState.hpp"
+#include "storage/FastHashTableFactory.hpp"
#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DatetimeIntervalType.hpp"
@@ -52,51 +54,56 @@
namespace quickstep {
-class AggregationHandleSumTest : public::testing::Test {
+class AggregationHandleSumTest : public ::testing::Test {
protected:
static const int kNumSamples = 1000;
// Helper method that calls AggregationHandleSum::iterateUnaryInl() to
// aggregate 'value' into '*state'.
void iterateHandle(AggregationState *state, const TypedValue &value) {
- static_cast<const AggregationHandleSum&>(*aggregation_handle_sum_).iterateUnaryInl(
- static_cast<AggregationStateSum*>(state),
- value);
+ static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_)
+ .iterateUnaryInl(static_cast<AggregationStateSum *>(state), value);
}
void initializeHandle(const Type &type) {
aggregation_handle_sum_.reset(
- AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
- std::vector<const Type*>(1, &type)));
+ AggregateFunctionFactory::Get(AggregationID::kSum)
+ .createHandle(std::vector<const Type *>(1, &type)));
aggregation_handle_sum_state_.reset(
aggregation_handle_sum_->createInitialState());
}
static bool ApplyToTypesTest(TypeID typeID) {
- const Type &type = (typeID == kChar || typeID == kVarChar) ?
- TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
- TypeFactory::GetType(typeID);
+ const Type &type =
+ (typeID == kChar || typeID == kVarChar)
+ ? TypeFactory::GetType(typeID, static_cast<std::size_t>(10))
+ : TypeFactory::GetType(typeID);
- return AggregateFunctionFactory::Get(AggregationID::kSum).canApplyToTypes(
- std::vector<const Type*>(1, &type));
+ return AggregateFunctionFactory::Get(AggregationID::kSum)
+ .canApplyToTypes(std::vector<const Type *>(1, &type));
}
static bool ResultTypeForArgumentTypeTest(TypeID input_type_id,
TypeID output_type_id) {
- const Type *result_type
- = AggregateFunctionFactory::Get(AggregationID::kSum).resultTypeForArgumentTypes(
- std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+ const Type *result_type =
+ AggregateFunctionFactory::Get(AggregationID::kSum)
+ .resultTypeForArgumentTypes(std::vector<const Type *>(
+ 1, &TypeFactory::GetType(input_type_id)));
return (result_type->getTypeID() == output_type_id);
}
template <typename CppType>
- static void CheckSumValue(
- CppType expected,
- const AggregationHandle &target,
- const AggregationState &state) {
+ static void CheckSumValue(CppType expected,
+ const AggregationHandle &target,
+ const AggregationState &state) {
EXPECT_EQ(expected, target.finalize(state).getLiteral<CppType>());
}
+ template <typename CppType>
+ static void CheckSumValue(CppType expected, const TypedValue &value) {
+ EXPECT_EQ(expected, value.getLiteral<CppType>());
+ }
+
// Static templated method to set a meaningful to data types.
template <typename CppType>
static void SetDataType(int value, CppType *data) {
@@ -108,7 +115,9 @@ class AggregationHandleSumTest : public::testing::Test {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+ EXPECT_TRUE(
+ aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+ .isNull());
typename GenericType::cpptype val;
typename PrecisionType::cpptype sum;
@@ -119,13 +128,14 @@ class AggregationHandleSumTest : public::testing::Test {
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
SetDataType(i - 10, &val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
+ SetDataType(static_cast<float>(i - 10) / 10, &val);
}
iterateHandle(aggregation_handle_sum_state_.get(), type.makeValue(&val));
sum += val;
}
iterateHandle(aggregation_handle_sum_state_.get(), type.makeNullValue());
- CheckSumValue<typename PrecisionType::cpptype>(sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
+ CheckSumValue<typename PrecisionType::cpptype>(
+ sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
// Test mergeStates().
std::unique_ptr<AggregationState> merge_state(
@@ -138,7 +148,7 @@ class AggregationHandleSumTest : public::testing::Test {
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
SetDataType(i - 10, &val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
+ SetDataType(static_cast<float>(i - 10) / 10, &val);
}
iterateHandle(merge_state.get(), type.makeValue(&val));
sum += val;
@@ -146,13 +156,11 @@ class AggregationHandleSumTest : public::testing::Test {
aggregation_handle_sum_->mergeStates(*merge_state,
aggregation_handle_sum_state_.get());
CheckSumValue<typename PrecisionType::cpptype>(
- sum,
- *aggregation_handle_sum_,
- *aggregation_handle_sum_state_);
+ sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
}
template <typename GenericType, typename Output>
- ColumnVector *createColumnVectorGeneric(const Type &type, Output *sum) {
+ ColumnVector* createColumnVectorGeneric(const Type &type, Output *sum) {
NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
typename GenericType::cpptype val;
@@ -163,12 +171,12 @@ class AggregationHandleSumTest : public::testing::Test {
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
SetDataType(i - 10, &val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
+ SetDataType(static_cast<float>(i - 10) / 10, &val);
}
column->appendTypedValue(type.makeValue(&val));
*sum += val;
// One NULL in the middle.
- if (i == kNumSamples/2) {
+ if (i == kNumSamples / 2) {
column->appendTypedValue(type.makeNullValue());
}
}
@@ -182,12 +190,15 @@ class AggregationHandleSumTest : public::testing::Test {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+ EXPECT_TRUE(
+ aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+ .isNull());
typename PrecisionType::cpptype sum;
std::vector<std::unique_ptr<ColumnVector>> column_vectors;
column_vectors.emplace_back(
- createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(type, &sum));
+ createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(
+ type, &sum));
std::unique_ptr<AggregationState> cv_state(
aggregation_handle_sum_->accumulateColumnVectors(column_vectors));
@@ -195,15 +206,12 @@ class AggregationHandleSumTest : public::testing::Test {
// Test the state generated directly by accumulateColumnVectors(), and also
// test after merging back.
CheckSumValue<typename PrecisionType::cpptype>(
- sum,
- *aggregation_handle_sum_,
- *cv_state);
+ sum, *aggregation_handle_sum_, *cv_state);
- aggregation_handle_sum_->mergeStates(*cv_state, aggregation_handle_sum_state_.get());
+ aggregation_handle_sum_->mergeStates(*cv_state,
+ aggregation_handle_sum_state_.get());
CheckSumValue<typename PrecisionType::cpptype>(
- sum,
- *aggregation_handle_sum_,
- *aggregation_handle_sum_state_);
+ sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -212,29 +220,30 @@ class AggregationHandleSumTest : public::testing::Test {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_).isNull());
+ EXPECT_TRUE(
+ aggregation_handle_sum_->finalize(*aggregation_handle_sum_state_)
+ .isNull());
typename PrecisionType::cpptype sum;
- std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
+ std::unique_ptr<ColumnVectorsValueAccessor> accessor(
+ new ColumnVectorsValueAccessor());
accessor->addColumn(
- createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(type, &sum));
+ createColumnVectorGeneric<GenericType, typename PrecisionType::cpptype>(
+ type, &sum));
std::unique_ptr<AggregationState> va_state(
- aggregation_handle_sum_->accumulateValueAccessor(accessor.get(),
- std::vector<attribute_id>(1, 0)));
+ aggregation_handle_sum_->accumulateValueAccessor(
+ accessor.get(), std::vector<attribute_id>(1, 0)));
// Test the state generated directly by accumulateValueAccessor(), and also
// test after merging back.
CheckSumValue<typename PrecisionType::cpptype>(
- sum,
- *aggregation_handle_sum_,
- *va_state);
+ sum, *aggregation_handle_sum_, *va_state);
- aggregation_handle_sum_->mergeStates(*va_state, aggregation_handle_sum_state_.get());
+ aggregation_handle_sum_->mergeStates(*va_state,
+ aggregation_handle_sum_state_.get());
CheckSumValue<typename PrecisionType::cpptype>(
- sum,
- *aggregation_handle_sum_,
- *aggregation_handle_sum_state_);
+ sum, *aggregation_handle_sum_, *aggregation_handle_sum_state_);
}
#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -247,9 +256,7 @@ const int AggregationHandleSumTest::kNumSamples;
template <>
void AggregationHandleSumTest::CheckSumValue<float>(
- float val,
- const AggregationHandle &handle,
- const AggregationState &state) {
+ float val, const AggregationHandle &handle, const AggregationState &state) {
EXPECT_FLOAT_EQ(val, handle.finalize(state).getLiteral<float>());
}
@@ -262,12 +269,14 @@ void AggregationHandleSumTest::CheckSumValue<double>(
}
template <>
-void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
+void AggregationHandleSumTest::SetDataType<DatetimeIntervalLit>(
+ int value, DatetimeIntervalLit *data) {
data->interval_ticks = value;
}
template <>
-void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
+void AggregationHandleSumTest::SetDataType<YearMonthIntervalLit>(
+ int value, YearMonthIntervalLit *data) {
data->months = value;
}
@@ -314,11 +323,13 @@ TEST_F(AggregationHandleSumTest, DoubleTypeColumnVectorTest) {
}
TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
+ checkAggregationSumGenericColumnVector<DatetimeIntervalType,
+ DatetimeIntervalType>();
}
TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeColumnVectorTest) {
- checkAggregationSumGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
+ checkAggregationSumGenericColumnVector<YearMonthIntervalType,
+ YearMonthIntervalType>();
}
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -339,11 +350,13 @@ TEST_F(AggregationHandleSumTest, DoubleTypeValueAccessorTest) {
}
TEST_F(AggregationHandleSumTest, DatetimeIntervalTypeValueAccessorTest) {
- checkAggregationSumGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
+ checkAggregationSumGenericValueAccessor<DatetimeIntervalType,
+ DatetimeIntervalType>();
}
TEST_F(AggregationHandleSumTest, YearMonthIntervalTypeValueAccessorTest) {
- checkAggregationSumGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
+ checkAggregationSumGenericValueAccessor<YearMonthIntervalType,
+ YearMonthIntervalType>();
}
#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -373,38 +386,53 @@ TEST_F(AggregationHandleSumDeathTest, WrongTypeTest) {
float float_val = 0;
// Passes.
- iterateHandle(aggregation_handle_sum_state_.get(), int_non_null_type.makeValue(&int_val));
+ iterateHandle(aggregation_handle_sum_state_.get(),
+ int_non_null_type.makeValue(&int_val));
- EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), long_type.makeValue(&long_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), double_type.makeValue(&double_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), float_type.makeValue(&float_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), char_type.makeValue("asdf", 5)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(), varchar_type.makeValue("asdf", 5)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+ long_type.makeValue(&long_val)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+ double_type.makeValue(&double_val)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+ float_type.makeValue(&float_val)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+ char_type.makeValue("asdf", 5)),
+ "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_sum_state_.get(),
+ varchar_type.makeValue("asdf", 5)),
+ "");
// Test mergeStates() with incorrectly typed handles.
std::unique_ptr<AggregationHandle> aggregation_handle_sum_double(
- AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
- std::vector<const Type*>(1, &double_type)));
+ AggregateFunctionFactory::Get(AggregationID::kSum)
+ .createHandle(std::vector<const Type *>(1, &double_type)));
std::unique_ptr<AggregationState> aggregation_state_sum_merge_double(
aggregation_handle_sum_double->createInitialState());
- static_cast<const AggregationHandleSum&>(*aggregation_handle_sum_double).iterateUnaryInl(
- static_cast<AggregationStateSum*>(aggregation_state_sum_merge_double.get()),
- double_type.makeValue(&double_val));
- EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double,
- aggregation_handle_sum_state_.get()),
- "");
+ static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_double)
+ .iterateUnaryInl(static_cast<AggregationStateSum *>(
+ aggregation_state_sum_merge_double.get()),
+ double_type.makeValue(&double_val));
+ EXPECT_DEATH(
+ aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_double,
+ aggregation_handle_sum_state_.get()),
+ "");
std::unique_ptr<AggregationHandle> aggregation_handle_sum_float(
- AggregateFunctionFactory::Get(AggregationID::kSum).createHandle(
- std::vector<const Type*>(1, &float_type)));
+ AggregateFunctionFactory::Get(AggregationID::kSum)
+ .createHandle(std::vector<const Type *>(1, &float_type)));
std::unique_ptr<AggregationState> aggregation_state_sum_merge_float(
aggregation_handle_sum_float->createInitialState());
- static_cast<const AggregationHandleSum&>(*aggregation_handle_sum_float).iterateUnaryInl(
- static_cast<AggregationStateSum*>(aggregation_state_sum_merge_float.get()),
- float_type.makeValue(&float_val));
- EXPECT_DEATH(aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float,
- aggregation_handle_sum_state_.get()),
- "");
+ static_cast<const AggregationHandleSum &>(*aggregation_handle_sum_float)
+ .iterateUnaryInl(static_cast<AggregationStateSum *>(
+ aggregation_state_sum_merge_float.get()),
+ float_type.makeValue(&float_val));
+ EXPECT_DEATH(
+ aggregation_handle_sum_->mergeStates(*aggregation_state_sum_merge_float,
+ aggregation_handle_sum_state_.get()),
+ "");
}
#endif
@@ -425,8 +453,10 @@ TEST_F(AggregationHandleSumTest, ResultTypeForArgumentTypeTest) {
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kLong));
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
+ EXPECT_TRUE(
+ ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+ EXPECT_TRUE(
+ ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
}
TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
@@ -434,25 +464,28 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
initializeHandle(long_non_null_type);
storage_manager_.reset(new StorageManager("./test_sum_data"));
std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
- aggregation_handle_sum_->createGroupByHashTable(
- HashTableImplType::kSimpleScalarSeparateChaining,
+ AggregationStateFastHashTableFactory::CreateResizable(
+ HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &long_non_null_type),
10,
+ {aggregation_handle_sum_.get()->getPayloadSize()},
+ {aggregation_handle_sum_.get()},
storage_manager_.get()));
std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
- aggregation_handle_sum_->createGroupByHashTable(
- HashTableImplType::kSimpleScalarSeparateChaining,
+ AggregationStateFastHashTableFactory::CreateResizable(
+ HashTableImplType::kSeparateChaining,
std::vector<const Type *>(1, &long_non_null_type),
10,
+ {aggregation_handle_sum_.get()->getPayloadSize()},
+ {aggregation_handle_sum_.get()},
storage_manager_.get()));
- AggregationStateHashTable<AggregationStateSum> *destination_hash_table_derived =
- static_cast<AggregationStateHashTable<AggregationStateSum> *>(
+ AggregationStateFastHashTable *destination_hash_table_derived =
+ static_cast<AggregationStateFastHashTable *>(
destination_hash_table.get());
- AggregationStateHashTable<AggregationStateSum> *source_hash_table_derived =
- static_cast<AggregationStateHashTable<AggregationStateSum> *>(
- source_hash_table.get());
+ AggregationStateFastHashTable *source_hash_table_derived =
+ static_cast<AggregationStateFastHashTable *>(source_hash_table.get());
AggregationHandleSum *aggregation_handle_sum_derived =
static_cast<AggregationHandleSum *>(aggregation_handle_sum_.get());
@@ -471,7 +504,8 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
const std::int64_t common_key_destination_sum = 4000;
TypedValue common_key_destination_sum_val(common_key_destination_sum);
- const std::int64_t merged_common_key = common_key_source_sum + common_key_destination_sum;
+ const std::int64_t merged_common_key =
+ common_key_source_sum + common_key_destination_sum;
TypedValue common_key_merged_val(merged_common_key);
const std::int64_t exclusive_key_source_sum = 100;
@@ -496,59 +530,82 @@ TEST_F(AggregationHandleSumTest, GroupByTableMergeTest) {
// Create sum value states for keys.
aggregation_handle_sum_derived->iterateUnaryInl(common_key_source_state.get(),
common_key_source_sum_val);
- std::int64_t actual_val = aggregation_handle_sum_->finalize(*common_key_source_state)
- .getLiteral<std::int64_t>();
+ std::int64_t actual_val =
+ aggregation_handle_sum_->finalize(*common_key_source_state)
+ .getLiteral<std::int64_t>();
EXPECT_EQ(common_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
aggregation_handle_sum_derived->iterateUnaryInl(
common_key_destination_state.get(), common_key_destination_sum_val);
actual_val = aggregation_handle_sum_->finalize(*common_key_destination_state)
.getLiteral<std::int64_t>();
- EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val);
+ EXPECT_EQ(common_key_destination_sum_val.getLiteral<std::int64_t>(),
+ actual_val);
aggregation_handle_sum_derived->iterateUnaryInl(
exclusive_key_destination_state.get(), exclusive_key_destination_sum_val);
actual_val =
aggregation_handle_sum_->finalize(*exclusive_key_destination_state)
.getLiteral<std::int64_t>();
- EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(), actual_val);
+ EXPECT_EQ(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+ actual_val);
aggregation_handle_sum_derived->iterateUnaryInl(
exclusive_key_source_state.get(), exclusive_key_source_sum_val);
actual_val = aggregation_handle_sum_->finalize(*exclusive_key_source_state)
.getLiteral<std::int64_t>();
- EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(), actual_val);
+ EXPECT_EQ(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+ actual_val);
// Add the key-state pairs to the hash tables.
- source_hash_table_derived->putCompositeKey(common_key,
- *common_key_source_state);
- destination_hash_table_derived->putCompositeKey(
- common_key, *common_key_destination_state);
- source_hash_table_derived->putCompositeKey(exclusive_source_key,
- *exclusive_key_source_state);
- destination_hash_table_derived->putCompositeKey(
- exclusive_destination_key, *exclusive_key_destination_state);
+ unsigned char buffer[100];
+ buffer[0] = '\0';
+ memcpy(buffer + 1,
+ common_key_source_state.get()->getPayloadAddress(),
+ aggregation_handle_sum_.get()->getPayloadSize());
+ source_hash_table_derived->putCompositeKey(common_key, buffer);
+
+ memcpy(buffer + 1,
+ common_key_destination_state.get()->getPayloadAddress(),
+ aggregation_handle_sum_.get()->getPayloadSize());
+ destination_hash_table_derived->putCompositeKey(common_key, buffer);
+
+ memcpy(buffer + 1,
+ exclusive_key_source_state.get()->getPayloadAddress(),
+ aggregation_handle_sum_.get()->getPayloadSize());
+ source_hash_table_derived->putCompositeKey(exclusive_source_key, buffer);
+
+ memcpy(buffer + 1,
+ exclusive_key_destination_state.get()->getPayloadAddress(),
+ aggregation_handle_sum_.get()->getPayloadSize());
+ destination_hash_table_derived->putCompositeKey(exclusive_destination_key,
+ buffer);
EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
EXPECT_EQ(2u, source_hash_table_derived->numEntries());
- aggregation_handle_sum_->mergeGroupByHashTables(*source_hash_table,
- destination_hash_table.get());
+ AggregationOperationState::mergeGroupByHashTables(
+ source_hash_table.get(), destination_hash_table.get());
EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
CheckSumValue<std::int64_t>(
common_key_merged_val.getLiteral<std::int64_t>(),
- *aggregation_handle_sum_derived,
- *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
- CheckSumValue<std::int64_t>(exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
- *aggregation_handle_sum_derived,
- *(destination_hash_table_derived->getSingleCompositeKey(
- exclusive_destination_key)));
- CheckSumValue<std::int64_t>(exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
- *aggregation_handle_sum_derived,
- *(source_hash_table_derived->getSingleCompositeKey(
- exclusive_source_key)));
+ aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+ destination_hash_table_derived->getSingleCompositeKey(common_key) +
+ 1));
+ CheckSumValue<std::int64_t>(
+ exclusive_key_destination_sum_val.getLiteral<std::int64_t>(),
+ aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+ destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key) +
+ 1));
+ CheckSumValue<std::int64_t>(
+ exclusive_key_source_sum_val.getLiteral<std::int64_t>(),
+ aggregation_handle_sum_derived->finalizeHashTableEntryFast(
+ source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key) +
+ 1));
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a9efc4e/storage/AggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 05d0636..c5f59f9 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -59,7 +59,7 @@ namespace quickstep {
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
- const std::vector<const AggregateFunction*> &aggregate_functions,
+ const std::vector<const AggregateFunction *> &aggregate_functions,
std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
std::vector<bool> &&is_distinct,
std::vector<std::unique_ptr<const Scalar>> &&group_by,
@@ -78,7 +78,7 @@ AggregationOperationState::AggregationOperationState(
DCHECK(aggregate_functions.size() == arguments_.size());
// Get the types of GROUP BY expressions for creating HashTables below.
- std::vector<const Type*> group_by_types;
+ std::vector<const Type *> group_by_types;
for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
group_by_types.emplace_back(&group_by_element->getType());
}
@@ -94,27 +94,29 @@ AggregationOperationState::AggregationOperationState(
handles_.emplace_back(new AggregationHandleDistinct());
arguments_.push_back({});
is_distinct_.emplace_back(false);
- group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
- new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- {1},
- handles_,
- storage_manager)));
+ group_by_hashtable_pools_.emplace_back(
+ std::unique_ptr<HashTablePool>(new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ {1},
+ handles_,
+ storage_manager)));
} else {
// Set up each individual aggregate in this operation.
- std::vector<const AggregateFunction*>::const_iterator agg_func_it
- = aggregate_functions.begin();
- std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator args_it
- = arguments_.begin();
+ std::vector<const AggregateFunction *>::const_iterator agg_func_it =
+ aggregate_functions.begin();
+ std::vector<std::vector<std::unique_ptr<const Scalar>>>::const_iterator
+ args_it = arguments_.begin();
std::vector<bool>::const_iterator is_distinct_it = is_distinct_.begin();
- std::vector<HashTableImplType>::const_iterator distinctify_hash_table_impl_types_it
- = distinctify_hash_table_impl_types.begin();
+ std::vector<HashTableImplType>::const_iterator
+ distinctify_hash_table_impl_types_it =
+ distinctify_hash_table_impl_types.begin();
std::vector<std::size_t> payload_sizes;
- for (; agg_func_it != aggregate_functions.end(); ++agg_func_it, ++args_it, ++is_distinct_it) {
+ for (; agg_func_it != aggregate_functions.end();
+ ++agg_func_it, ++args_it, ++is_distinct_it) {
// Get the Types of this aggregate's arguments so that we can create an
// AggregationHandle.
- std::vector<const Type*> argument_types;
+ std::vector<const Type *> argument_types;
for (const std::unique_ptr<const Scalar> &argument : *args_it) {
argument_types.emplace_back(&argument->getType());
}
@@ -129,12 +131,13 @@ AggregationOperationState::AggregationOperationState(
handles_.emplace_back((*agg_func_it)->createHandle(argument_types));
if (!group_by_list_.empty()) {
- // Aggregation with GROUP BY: combined payload is partially updated in the presence of DISTINCT.
- if (*is_distinct_it) {
- handles_.back()->BlockUpdate();
- }
- group_by_handles.emplace_back(handles_.back());
- payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
+ // Aggregation with GROUP BY: combined payload is partially updated in
+ // the presence of DISTINCT.
+ if (*is_distinct_it) {
+ handles_.back()->blockUpdate();
+ }
+ group_by_handles.emplace_back(handles_.back());
+ payload_sizes.emplace_back(group_by_handles.back()->getPayloadSize());
} else {
// Aggregation without GROUP BY: create a single global state.
single_states_.emplace_back(handles_.back()->createInitialState());
@@ -146,31 +149,38 @@ AggregationOperationState::AggregationOperationState(
std::vector<attribute_id> local_arguments_as_attributes;
local_arguments_as_attributes.reserve(args_it->size());
for (const std::unique_ptr<const Scalar> &argument : *args_it) {
- const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
+ const attribute_id argument_id =
+ argument->getAttributeIdForValueAccessor();
if (argument_id == -1) {
local_arguments_as_attributes.clear();
break;
} else {
- DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+ DCHECK_EQ(input_relation_.getID(),
+ argument->getRelationIdForValueAccessor());
local_arguments_as_attributes.push_back(argument_id);
}
}
- arguments_as_attributes_.emplace_back(std::move(local_arguments_as_attributes));
+ arguments_as_attributes_.emplace_back(
+ std::move(local_arguments_as_attributes));
#endif
}
- // Initialize the corresponding distinctify hash table if this is a DISTINCT
+ // Initialize the corresponding distinctify hash table if this is a
+ // DISTINCT
// aggregation.
if (*is_distinct_it) {
- std::vector<const Type*> key_types(group_by_types);
- key_types.insert(key_types.end(), argument_types.begin(), argument_types.end());
- // TODO(jianqiao): estimated_num_entries is quite inaccurate for estimating
+ std::vector<const Type *> key_types(group_by_types);
+ key_types.insert(
+ key_types.end(), argument_types.begin(), argument_types.end());
+ // TODO(jianqiao): estimated_num_entries is quite inaccurate for
+ // estimating
// the number of entries in the distinctify hash table. We may estimate
- // for each distinct aggregation an estimated_num_distinct_keys value during
+ // for each distinct aggregation an estimated_num_distinct_keys value
+ // during
// query optimization, if it worths.
distinctify_hashtables_.emplace_back(
- AggregationStateFastHashTableFactory::CreateResizable(
+ AggregationStateFastHashTableFactory::CreateResizable(
*distinctify_hash_table_impl_types_it,
key_types,
estimated_num_entries,
@@ -184,16 +194,17 @@ AggregationOperationState::AggregationOperationState(
}
if (!group_by_handles.empty()) {
- // Aggregation with GROUP BY: create a HashTable pool for per-group states.
+ // Aggregation with GROUP BY: create a HashTable pool for per-group
+ // states.
group_by_hashtable_pools_.emplace_back(std::unique_ptr<HashTablePool>(
- new HashTablePool(estimated_num_entries,
- hash_table_impl_type,
- group_by_types,
- payload_sizes,
- group_by_handles,
- storage_manager)));
- }
+ new HashTablePool(estimated_num_entries,
+ hash_table_impl_type,
+ group_by_types,
+ payload_sizes,
+ group_by_handles,
+ storage_manager)));
}
+ }
}
AggregationOperationState* AggregationOperationState::ReconstructFromProto(
@@ -203,7 +214,7 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
DCHECK(ProtoIsValid(proto, database));
// Rebuild contructor arguments from their representation in 'proto'.
- std::vector<const AggregateFunction*> aggregate_functions;
+ std::vector<const AggregateFunction *> aggregate_functions;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments;
std::vector<bool> is_distinct;
std::vector<HashTableImplType> distinctify_hash_table_impl_types;
@@ -216,62 +227,63 @@ AggregationOperationState* AggregationOperationState::ReconstructFromProto(
arguments.emplace_back();
arguments.back().reserve(agg_proto.argument_size());
- for (int argument_idx = 0; argument_idx < agg_proto.argument_size(); ++argument_idx) {
+ for (int argument_idx = 0; argument_idx < agg_proto.argument_size();
+ ++argument_idx) {
arguments.back().emplace_back(ScalarFactory::ReconstructFromProto(
- agg_proto.argument(argument_idx),
- database));
+ agg_proto.argument(argument_idx), database));
}
is_distinct.emplace_back(agg_proto.is_distinct());
if (agg_proto.is_distinct()) {
distinctify_hash_table_impl_types.emplace_back(
- HashTableImplTypeFromProto(
- proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index)));
+ HashTableImplTypeFromProto(proto.distinctify_hash_table_impl_types(
+ distinctify_hash_table_impl_type_index)));
++distinctify_hash_table_impl_type_index;
}
}
std::vector<std::unique_ptr<const Scalar>> group_by_expressions;
- for (int group_by_idx = 0;
- group_by_idx < proto.group_by_expressions_size();
+ for (int group_by_idx = 0; group_by_idx < proto.group_by_expressions_size();
++group_by_idx) {
group_by_expressions.emplace_back(ScalarFactory::ReconstructFromProto(
- proto.group_by_expressions(group_by_idx),
- database));
+ proto.group_by_expressions(group_by_idx), database));
}
unique_ptr<Predicate> predicate;
if (proto.has_predicate()) {
predicate.reset(
- PredicateFactory::ReconstructFromProto(proto.predicate(),
- database));
+ PredicateFactory::ReconstructFromProto(proto.predicate(), database));
}
- return new AggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
- aggregate_functions,
- std::move(arguments),
- std::move(is_distinct),
- std::move(group_by_expressions),
- predicate.release(),
- proto.estimated_num_entries(),
- HashTableImplTypeFromProto(proto.hash_table_impl_type()),
- distinctify_hash_table_impl_types,
- storage_manager);
+ return new AggregationOperationState(
+ database.getRelationSchemaById(proto.relation_id()),
+ aggregate_functions,
+ std::move(arguments),
+ std::move(is_distinct),
+ std::move(group_by_expressions),
+ predicate.release(),
+ proto.estimated_num_entries(),
+ HashTableImplTypeFromProto(proto.hash_table_impl_type()),
+ distinctify_hash_table_impl_types,
+ storage_manager);
}
-bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOperationState &proto,
- const CatalogDatabaseLite &database) {
+bool AggregationOperationState::ProtoIsValid(
+ const serialization::AggregationOperationState &proto,
+ const CatalogDatabaseLite &database) {
if (!proto.IsInitialized() ||
!database.hasRelationWithId(proto.relation_id()) ||
(proto.aggregates_size() < 0)) {
return false;
}
- std::size_t num_distinctify_hash_tables = proto.distinctify_hash_table_impl_types_size();
+ std::size_t num_distinctify_hash_tables =
+ proto.distinctify_hash_table_impl_types_size();
std::size_t distinctify_hash_table_impl_type_index = 0;
for (int i = 0; i < proto.aggregates_size(); ++i) {
- if (!AggregateFunctionFactory::ProtoIsValid(proto.aggregates(i).function())) {
+ if (!AggregateFunctionFactory::ProtoIsValid(
+ proto.aggregates(i).function())) {
return false;
}
@@ -282,16 +294,18 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe
for (int argument_idx = 0;
argument_idx < proto.aggregates(i).argument_size();
++argument_idx) {
- if (!ScalarFactory::ProtoIsValid(proto.aggregates(i).argument(argument_idx),
- database)) {
+ if (!ScalarFactory::ProtoIsValid(
+ proto.aggregates(i).argument(argument_idx), database)) {
return false;
}
}
if (proto.aggregates(i).is_distinct()) {
- if (distinctify_hash_table_impl_type_index >= num_distinctify_hash_tables ||
+ if (distinctify_hash_table_impl_type_index >=
+ num_distinctify_hash_tables ||
!serialization::HashTableImplType_IsValid(
- proto.distinctify_hash_table_impl_types(distinctify_hash_table_impl_type_index))) {
+ proto.distinctify_hash_table_impl_types(
+ distinctify_hash_table_impl_type_index))) {
return false;
}
}
@@ -304,8 +318,9 @@ bool AggregationOperationState::ProtoIsValid(const serialization::AggregationOpe
}
if (proto.group_by_expressions_size() > 0) {
- if (!proto.has_hash_table_impl_type()
- || !serialization::HashTableImplType_IsValid(proto.hash_table_impl_type())) {
+ if (!proto.has_hash_table_impl_type() ||
+ !serialization::HashTableImplType_IsValid(
+ proto.hash_table_impl_type())) {
return false;
}
}
@@ -327,7 +342,8 @@ void AggregationOperationState::aggregateBlock(const block_id input_block) {
}
}
-void AggregationOperationState::finalizeAggregate(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeAggregate(
+ InsertDestination *output_destination) {
if (group_by_list_.empty()) {
finalizeSingleState(output_destination);
} else {
@@ -346,19 +362,19 @@ void AggregationOperationState::mergeSingleState(
}
}
-void AggregationOperationState::aggregateBlockSingleState(const block_id input_block) {
+void AggregationOperationState::aggregateBlockSingleState(
+ const block_id input_block) {
// Aggregate per-block state for each aggregate.
std::vector<std::unique_ptr<AggregationState>> local_state;
- BlockReference block(storage_manager_->getBlock(input_block, input_relation_));
+ BlockReference block(
+ storage_manager_->getBlock(input_block, input_relation_));
// If there is a filter predicate, 'reuse_matches' holds the set of matching
// tuples so that it can be reused across multiple aggregates (i.e. we only
// pay the cost of evaluating the predicate once).
std::unique_ptr<TupleIdSequence> reuse_matches;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
const std::vector<attribute_id> *local_arguments_as_attributes = nullptr;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all arguments are attributes of the input relation, elide a copy.
@@ -381,12 +397,11 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
local_state.emplace_back(nullptr);
} else {
// Call StorageBlock::aggregate() to actually do the aggregation.
- local_state.emplace_back(
- block->aggregate(*handles_[agg_idx],
- arguments_[agg_idx],
- local_arguments_as_attributes,
- predicate_.get(),
- &reuse_matches));
+ local_state.emplace_back(block->aggregate(*handles_[agg_idx],
+ arguments_[agg_idx],
+ local_arguments_as_attributes,
+ predicate_.get(),
+ &reuse_matches));
}
}
@@ -394,8 +409,10 @@ void AggregationOperationState::aggregateBlockSingleState(const block_id input_b
mergeSingleState(local_state);
}
-void AggregationOperationState::aggregateBlockHashTable(const block_id input_block) {
- BlockReference block(storage_manager_->getBlock(input_block, input_relation_));
+void AggregationOperationState::aggregateBlockHashTable(
+ const block_id input_block) {
+ BlockReference block(
+ storage_manager_->getBlock(input_block, input_relation_));
// If there is a filter predicate, 'reuse_matches' holds the set of matching
// tuples so that it can be reused across multiple aggregates (i.e. we only
@@ -407,11 +424,10 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// GROUP BY expressions once).
std::vector<std::unique_ptr<ColumnVector>> reuse_group_by_vectors;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
- // Call StorageBlock::aggregateDistinct() to insert the GROUP BY expression
+ // Call StorageBlock::aggregateDistinct() to insert the GROUP BY
+ // expression
// values and the aggregation arguments together as keys directly into the
// (threadsafe) shared global distinctify HashTable for this aggregate.
block->aggregateDistinct(*handles_[agg_idx],
@@ -429,7 +445,8 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
// directly into the (threadsafe) shared global HashTable for this
// aggregate.
DCHECK(group_by_hashtable_pools_[0] != nullptr);
- AggregationStateHashTableBase *agg_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+ AggregationStateHashTableBase *agg_hash_table =
+ group_by_hashtable_pools_[0]->getHashTableFast();
DCHECK(agg_hash_table != nullptr);
block->aggregateGroupByFast(arguments_,
group_by_list_,
@@ -440,32 +457,35 @@ void AggregationOperationState::aggregateBlockHashTable(const block_id input_blo
group_by_hashtable_pools_[0]->returnHashTable(agg_hash_table);
}
-void AggregationOperationState::finalizeSingleState(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeSingleState(
+ InsertDestination *output_destination) {
// Simply build up a Tuple from the finalized values for each aggregate and
// insert it in '*output_destination'.
std::vector<TypedValue> attribute_values;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
single_states_[agg_idx].reset(
- handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(*distinctify_hashtables_[agg_idx]));
+ handles_[agg_idx]->aggregateOnDistinctifyHashTableForSingle(
+ *distinctify_hashtables_[agg_idx]));
}
- attribute_values.emplace_back(handles_[agg_idx]->finalize(*single_states_[agg_idx]));
+ attribute_values.emplace_back(
+ handles_[agg_idx]->finalize(*single_states_[agg_idx]));
}
output_destination->insertTuple(Tuple(std::move(attribute_values)));
}
-void AggregationOperationState::mergeGroupByHashTables(AggregationStateHashTableBase *src,
- AggregationStateHashTableBase *dst) {
- HashTableMergerFast merger(dst);
- (static_cast<FastHashTable<true, false, true, false> *>(src))->forEachCompositeKeyFast(&merger);
+void AggregationOperationState::mergeGroupByHashTables(
+ AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst) {
+ HashTableMergerFast merger(dst);
+ (static_cast<FastHashTable<true, false, true, false> *>(src))
+ ->forEachCompositeKeyFast(&merger);
}
-void AggregationOperationState::finalizeHashTable(InsertDestination *output_destination) {
+void AggregationOperationState::finalizeHashTable(
+ InsertDestination *output_destination) {
// Each element of 'group_by_keys' is a vector of values for a particular
// group (which is also the prefix of the finalized Tuple for that group).
std::vector<std::vector<TypedValue>> group_by_keys;
@@ -483,17 +503,14 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
hash_table_index < static_cast<int>(hash_tables->size() - 1);
++hash_table_index) {
// Merge each hash table to the last hash table.
- mergeGroupByHashTables(
- (*hash_tables)[hash_table_index].get(),
- hash_tables->back().get());
+ mergeGroupByHashTables((*hash_tables)[hash_table_index].get(),
+ hash_tables->back().get());
}
}
// Collect per-aggregate finalized values.
std::vector<std::unique_ptr<ColumnVector>> final_values;
- for (std::size_t agg_idx = 0;
- agg_idx < handles_.size();
- ++agg_idx) {
+ for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
if (is_distinct_[agg_idx]) {
DCHECK(group_by_hashtable_pools_[0] != nullptr);
auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
@@ -502,18 +519,17 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
// We may have a case where hash_tables is empty, e.g. no input blocks.
// However for aggregateOnDistinctifyHashTableForGroupBy to work
// correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTableFast();
+ AggregationStateHashTableBase *new_hash_table =
+ group_by_hashtable_pools_[0]->getHashTableFast();
group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
}
DCHECK(hash_tables->back() != nullptr);
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
- handles_[agg_idx]->AllowUpdate();
+ handles_[agg_idx]->allowUpdate();
handles_[agg_idx]->aggregateOnDistinctifyHashTableForGroupBy(
- *distinctify_hashtables_[agg_idx],
- agg_hash_table,
- agg_idx);
+ *distinctify_hashtables_[agg_idx], agg_hash_table, agg_idx);
}
auto *hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
@@ -522,16 +538,15 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
// We may have a case where hash_tables is empty, e.g. no input blocks.
// However for aggregateOnDistinctifyHashTableForGroupBy to work
// correctly, we should create an empty group by hash table.
- AggregationStateHashTableBase *new_hash_table = group_by_hashtable_pools_[0]->getHashTable();
+ AggregationStateHashTableBase *new_hash_table =
+ group_by_hashtable_pools_[0]->getHashTable();
group_by_hashtable_pools_[0]->returnHashTable(new_hash_table);
hash_tables = group_by_hashtable_pools_[0]->getAllHashTables();
}
AggregationStateHashTableBase *agg_hash_table = hash_tables->back().get();
DCHECK(agg_hash_table != nullptr);
- ColumnVector* agg_result_col =
- handles_[agg_idx]->finalizeHashTable(*agg_hash_table,
- &group_by_keys,
- agg_idx);
+ ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
+ *agg_hash_table, &group_by_keys, agg_idx);
if (agg_result_col != nullptr) {
final_values.emplace_back(agg_result_col);
}
@@ -549,16 +564,20 @@ void AggregationOperationState::finalizeHashTable(InsertDestination *output_dest
for (const std::unique_ptr<const Scalar> &group_by_element : group_by_list_) {
const Type &group_by_type = group_by_element->getType();
if (NativeColumnVector::UsableForType(group_by_type)) {
- NativeColumnVector *element_cv = new NativeColumnVector(group_by_type, group_by_keys.size());
+ NativeColumnVector *element_cv =
+ new NativeColumnVector(group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
} else {
- IndirectColumnVector *element_cv = new IndirectColumnVector(group_by_type, group_by_keys.size());
+ IndirectColumnVector *element_cv =
+ new IndirectColumnVector(group_by_type, group_by_keys.size());
group_by_cvs.emplace_back(element_cv);
for (std::vector<TypedValue> &group_key : group_by_keys) {
- element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
+ element_cv->appendTypedValue(
+ std::move(group_key[group_by_element_idx]));
}
}
++group_by_element_idx;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a9efc4e/storage/AggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index d408c22..7956bc6 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -102,16 +102,17 @@ class AggregationOperationState {
* tables. Single aggregation state (when GROUP BY list is not
* specified) is not allocated using memory from storage manager.
*/
- AggregationOperationState(const CatalogRelationSchema &input_relation,
- const std::vector<const AggregateFunction*> &aggregate_functions,
- std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
- std::vector<bool> &&is_distinct,
- std::vector<std::unique_ptr<const Scalar>> &&group_by,
- const Predicate *predicate,
- const std::size_t estimated_num_entries,
- const HashTableImplType hash_table_impl_type,
- const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
- StorageManager *storage_manager);
+ AggregationOperationState(
+ const CatalogRelationSchema &input_relation,
+ const std::vector<const AggregateFunction *> &aggregate_functions,
+ std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments,
+ std::vector<bool> &&is_distinct,
+ std::vector<std::unique_ptr<const Scalar>> &&group_by,
+ const Predicate *predicate,
+ const std::size_t estimated_num_entries,
+ const HashTableImplType hash_table_impl_type,
+ const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
+ StorageManager *storage_manager);
~AggregationOperationState() {}
@@ -143,8 +144,9 @@ class AggregationOperationState {
* in.
* @return Whether proto is fully-formed and valid.
**/
- static bool ProtoIsValid(const serialization::AggregationOperationState &proto,
- const CatalogDatabaseLite &database);
+ static bool ProtoIsValid(
+ const serialization::AggregationOperationState &proto,
+ const CatalogDatabaseLite &database);
/**
* @brief Compute aggregates on the tuples of the given storage block,
@@ -165,12 +167,16 @@ class AggregationOperationState {
**/
void finalizeAggregate(InsertDestination *output_destination);
+ static void mergeGroupByHashTables(AggregationStateHashTableBase *src,
+ AggregationStateHashTableBase *dst);
+
int dflag;
private:
// Merge locally (per storage block) aggregated states with global aggregation
// states.
- void mergeSingleState(const std::vector<std::unique_ptr<AggregationState>> &local_state);
+ void mergeSingleState(
+ const std::vector<std::unique_ptr<AggregationState>> &local_state);
// Aggregate on input block.
void aggregateBlockSingleState(const block_id input_block);
@@ -187,7 +193,7 @@ class AggregationOperationState {
// Each individual aggregate in this operation has an AggregationHandle and
// some number of Scalar arguments.
-// std::vector<std::unique_ptr<AggregationHandle>> handles_;
+ // std::vector<std::unique_ptr<AggregationHandle>> handles_;
std::vector<AggregationHandle *> handles_;
std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_;
@@ -196,7 +202,8 @@ class AggregationOperationState {
std::vector<bool> is_distinct_;
// Hash table for obtaining distinct (i.e. unique) arguments.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> distinctify_hashtables_;
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+ distinctify_hashtables_;
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// If all an aggregate's argument expressions are simply attributes in
@@ -211,15 +218,14 @@ class AggregationOperationState {
//
// TODO(shoban): We should ideally store the aggregation state together in one
// hash table to prevent multiple lookups.
- std::vector<std::unique_ptr<AggregationStateHashTableBase>> group_by_hashtables_;
+ std::vector<std::unique_ptr<AggregationStateHashTableBase>>
+ group_by_hashtables_;
// A vector of group by hash table pools, one for each group by clause.
std::vector<std::unique_ptr<HashTablePool>> group_by_hashtable_pools_;
StorageManager *storage_manager_;
- void mergeGroupByHashTables(AggregationStateHashTableBase *src, AggregationStateHashTableBase *dst);
-
DISALLOW_COPY_AND_ASSIGN(AggregationOperationState);
};