You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/13 21:26:54 UTC
incubator-quickstep git commit: Removed finalize()
Repository: incubator-quickstep
Updated Branches:
refs/heads/SQL-window-aggregation c50ce5139 -> 0f5c41d15
Removed finalize()
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0f5c41d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0f5c41d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0f5c41d1
Branch: refs/heads/SQL-window-aggregation
Commit: 0f5c41d1531032065e747b642b9013802f74ac06
Parents: c50ce51
Author: shixuan-fan <sh...@apache.org>
Authored: Wed Jul 13 21:26:29 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Wed Jul 13 21:26:29 2016 +0000
----------------------------------------------------------------------
.../WindowAggregationHandle.hpp | 16 +-
.../WindowAggregationHandleAvg.cpp | 47 +-
.../WindowAggregationHandleAvg.hpp | 18 +-
.../WindowAggregationHandleAvg_unittest.cpp | 600 +++----------------
storage/WindowAggregationOperationState.cpp | 19 +-
5 files changed, 140 insertions(+), 560 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 8511b9e..831bcbf 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -102,14 +102,12 @@ class WindowAggregationHandle {
* NULL if all arguments are attributes.
* @param output_destination The destination for output.
**/
- virtual void calculate(ColumnVectorsValueAccessor* block_accessors,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) = 0;
-
- virtual ValueAccessor* finalize() = 0;
+ virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+ std::vector<ColumnVector*> &&arguments,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) = 0;
protected:
/**
@@ -133,8 +131,6 @@ class WindowAggregationHandle {
}
}
- std::unique_ptr<ColumnVectorsValueAccessor> tuple_accessor_;
- std::unique_ptr<NativeColumnVector> window_aggregates_;
const CatalogRelationSchema &relation_;
std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 7daaddf..14fc1d9 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -88,22 +88,21 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
.makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
}
-void WindowAggregationHandleAvg::calculate(ColumnVectorsValueAccessor *tuple_accessor,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) {
+ColumnVector* WindowAggregationHandleAvg::calculate(
+ ColumnVectorsValueAccessor *tuple_accessor,
+ std::vector<ColumnVector*> &&arguments,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) {
DCHECK(arguments.size() == 1);
DCHECK(arguments[0]->isNative());
DCHECK(static_cast<std::size_t>(tuple_accessor->getNumTuples()) ==
static_cast<const NativeColumnVector*>(arguments[0])->size());
-
- tuple_accessor_.reset(tuple_accessor);
// Initialize the output column and argument accessor.
- window_aggregates_.reset(
- new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples()));
+ NativeColumnVector *window_aggregates =
+ new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
argument_accessor->addColumn(arguments[0]);
@@ -111,22 +110,21 @@ void WindowAggregationHandleAvg::calculate(ColumnVectorsValueAccessor *tuple_acc
tuple_accessor->beginIteration();
argument_accessor->beginIteration();
- while (tuple_accessor_->next() && argument_accessor->next()) {
- const TypedValue window_aggregate = this->calculateOneWindow(argument_accessor,
+ while (tuple_accessor->next() && argument_accessor->next()) {
+ const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
+ argument_accessor,
partition_by_ids,
is_row,
num_preceding,
num_following);
- window_aggregates_->appendTypedValue(window_aggregate);
+ window_aggregates->appendTypedValue(window_aggregate);
}
-}
-ValueAccessor* WindowAggregationHandleAvg::finalize() {
- tuple_accessor_->addColumn(window_aggregates_.release());
- return tuple_accessor_.get();
+ return window_aggregates;
}
TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+ ColumnVectorsValueAccessor *tuple_accessor,
ColumnVectorsValueAccessor *argument_accessor,
const std::vector<attribute_id> &partition_by_ids,
const bool is_row,
@@ -149,11 +147,11 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
std::vector<TypedValue> current_row_partition_key;
for (attribute_id partition_by_id : partition_by_ids) {
current_row_partition_key.push_back(
- tuple_accessor_->getTypedValue(partition_by_id));
+ tuple_accessor->getTypedValue(partition_by_id));
}
// Get current position.
- tuple_id current_tuple_id = tuple_accessor_->getCurrentPositionVirtual();
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
// Find preceding tuples.
int count_preceding = 0;
@@ -168,7 +166,8 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
// Get the partition keys and compare. If not the same partition as the
// current row, stop searching preceding tuples.
- if (!samePartition(current_row_partition_key,
+ if (!samePartition(tuple_accessor,
+ current_row_partition_key,
preceding_tuple_id,
partition_by_ids)) {
break;
@@ -196,13 +195,14 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
following_tuple_id++;
// No more following tuples.
- if (following_tuple_id == tuple_accessor_->getNumTuples()) {
+ if (following_tuple_id == tuple_accessor->getNumTuples()) {
break;
}
// Get the partition keys and compare. If not the same partition as the
// current row, stop searching preceding tuples.
- if (!samePartition(current_row_partition_key,
+ if (!samePartition(tuple_accessor,
+ current_row_partition_key,
following_tuple_id,
partition_by_ids)) {
break;
@@ -229,6 +229,7 @@ TypedValue WindowAggregationHandleAvg::calculateOneWindow(
}
bool WindowAggregationHandleAvg::samePartition(
+ const ColumnVectorsValueAccessor *tuple_accessor,
const std::vector<TypedValue> ¤t_row_partition_key,
const tuple_id boundary_tuple_id,
const std::vector<attribute_id> &partition_by_ids) const {
@@ -237,7 +238,7 @@ bool WindowAggregationHandleAvg::samePartition(
++partition_by_index) {
if (!equal_comparators_[partition_by_index]->compareTypedValues(
current_row_partition_key[partition_by_index],
- tuple_accessor_->getTypedValueAtAbsolutePosition(
+ tuple_accessor->getTypedValueAtAbsolutePosition(
partition_by_ids[partition_by_index], boundary_tuple_id))) {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 4eb0846..72076fa 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -55,14 +55,12 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
public:
~WindowAggregationHandleAvg() override {}
- void calculate(ColumnVectorsValueAccessor* block_accessors,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following);
-
- ValueAccessor* finalize() override;
+ ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
+ std::vector<ColumnVector*> &&arguments,
+ const std::vector<attribute_id> &partition_by_ids,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following);
private:
friend class WindowAggregateFunctionAvg;
@@ -83,13 +81,15 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
const Type &type);
TypedValue calculateOneWindow(
+ ColumnVectorsValueAccessor *tuple_accessor,
ColumnVectorsValueAccessor *argument_accessor,
const std::vector<attribute_id> &partition_by_ids,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following) const;
- bool samePartition(const std::vector<TypedValue> ¤t_row_partition_key,
+ bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+ const std::vector<TypedValue> ¤t_row_partition_key,
const tuple_id boundary_tuple_id,
const std::vector<attribute_id> &partition_by_ids) const;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index 8fd3c8a..58c8019 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -28,7 +28,6 @@
#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
#include "expressions/window_aggregation/WindowAggregationID.hpp"
-#include "storage/StorageManager.hpp"
#include "types/CharType.hpp"
#include "types/DateOperatorOverloads.hpp"
#include "types/DatetimeIntervalType.hpp"
@@ -51,141 +50,15 @@ namespace quickstep {
namespace {
- constexpr int kNumTuplesPerBlock = 100;
- constexpr int kNumBlocks = 5;
+ constexpr int kNumTuples = 100;
constexpr int kNumTuplesPerPartition = 8;
+ constexpr int kNullInterval = 25;
} // namespace
// Attribute value could be null if set true.
class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
- protected:
- virtual void SetUp() {
- // Initialize relation and storage manager.
- relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId));
- storage_manager_.reset(new StorageManager("TestAvg"));
-
- // Add All kinds of TypedValues.
- CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(),
- "int_attr",
- TypeFactory::GetType(kInt, GetParam()));
-
- relation_->addAttribute(int_attr);
-
- CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(),
- "float_attr",
- TypeFactory::GetType(kFloat, GetParam()));
- relation_->addAttribute(float_attr);
-
- CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(),
- "long_attr",
- TypeFactory::GetType(kLong, GetParam()));
- relation_->addAttribute(long_attr);
-
- CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(),
- "double_attr",
- TypeFactory::GetType(kDouble, GetParam()));
- relation_->addAttribute(double_attr);
-
- CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(),
- "char_attr",
- TypeFactory::GetType(kChar, 4, GetParam()));
- relation_->addAttribute(char_attr);
-
- CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(),
- "varchar_attr",
- TypeFactory::GetType(kVarChar, 32, GetParam()));
- relation_->addAttribute(varchar_attr);
-
- // Records the 'base_value' of a tuple used in createSampleTuple.
- CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(),
- "partition_value",
- TypeFactory::GetType(kInt, false));
- relation_->addAttribute(partition_value);
-
- StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true);
-
- // Initialize blocks.
- for (int i = 0; i < kNumBlocks; ++i) {
- block_id bid = storage_manager_->createBlock(relation_, layout);
- relation_->addBlock(bid);
- insertTuples(bid);
- }
- }
-
- // Insert kNumTuplesPerBlock tuples into the block.
- void insertTuples(block_id bid) {
- MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_);
- for (int i = 0; i < kNumTuplesPerBlock; ++i) {
- Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i);
- block->insertTuple(*tuple);
- }
- }
-
- Tuple* createTuple(int base_value) {
- std::vector<TypedValue> attrs;
-
- // int_attr.
- if (GetParam() && base_value % 10 == 0) {
- // Throw in a NULL integer for every ten values.
- attrs.emplace_back(kInt);
- } else {
- attrs.emplace_back(base_value);
- }
-
- // float_attr.
- if (GetParam() && base_value % 10 == 1) {
- attrs.emplace_back(kFloat);
- } else {
- attrs.emplace_back(static_cast<float>(0.4 * base_value));
- }
-
- // long_attr.
- if (GetParam() && base_value % 10 == 2) {
- attrs.emplace_back(kLong);
- } else {
- attrs.emplace_back(static_cast<std::int64_t>(base_value));
- }
-
- // double_attr.
- if (GetParam() && base_value % 10 == 3) {
- attrs.emplace_back(kDouble);
- } else {
- attrs.emplace_back(static_cast<double>(0.25 * base_value));
- }
-
- // char_attr
- if (GetParam() && base_value % 10 == 4) {
- attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue());
- } else {
- std::ostringstream char_buffer;
- char_buffer << base_value;
- std::string string_literal(char_buffer.str());
- attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue(
- string_literal.c_str(),
- string_literal.size() > 3 ? 4
- : string_literal.size() + 1));
- attrs.back().ensureNotReference();
- }
-
- // varchar_attr
- if (GetParam() && base_value % 10 == 5) {
- attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue());
- } else {
- std::ostringstream char_buffer;
- char_buffer << "Here are some numbers: " << base_value;
- std::string string_literal(char_buffer.str());
- attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue(
- string_literal.c_str(),
- string_literal.size() + 1));
- attrs.back().ensureNotReference();
- }
-
- // base_value
- attrs.emplace_back(base_value / kNumTuplesPerPartition);
- return new Tuple(std::move(attrs));
- }
-
+ protected:
// Handle initialization.
void initializeHandle(const Type &argument_type,
const std::vector<const Type*> &partition_key_types) {
@@ -217,14 +90,18 @@ class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
template <typename CppType>
static void CheckAvgValues(
- std::vector<CppType> expected,
+ std::vector<CppType*> expected,
const ColumnVector *actual) {
EXPECT_TRUE(actual->isNative());
NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
EXPECT_EQ(expected.size(), actual->size());
for (std::size_t i = 0; i < expected.size(); ++i) {
- EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+ if (expected[i] == nullptr) {
+ EXPECT_TRUE(actual->getTypedValue(i).isNull());
+ } else {
+ EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+ }
}
}
@@ -238,405 +115,110 @@ class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
void checkAggregationAvgGeneric() {
const GenericType &type = GenericType::Instance(true);
initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_avg_->finalize(relation_, storage_manager_).empty());
-
- aggregation_handle_avg_->calculate(relation_.getBlocksSnapshot(),
- std::vector<GenericType
-
+ EXPECT_EQ(0, aggregation_handle_avg_->finalize()->getNumTuplesVirtual());
+
+ // Create argument, partition key and cpptype vectors.
+ std::vector<GenericType::cpptype*> argument_cpp_vector;
+ argument_cpp_vector.reserve(kNumTuples);
+ ColumnVector *argument_type_vector =
+ createArgumentGeneric<GenericType>(&argument_cpp_vector);
+ const IntType &int_type = ;
+ NativeColumnVector *partition_key_vector =
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+
+ for (int i = 0; i < kNumTuples; ++i) {
+ partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+ }
- std::vector<OutputType> result_vector;
- typename GenericType::cpptype val;
- typename GenericType::cpptype sum;
- SetDataType(0, &sum);
+ // Create tuple ValueAccessor
+ ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
+ tuple_accessor->addColumn(partition_key_vector);
+ tuple_accessor->addColumn(argument_type_vector);
- for (int i = 0; i < kNumSamples; ++i) {
- if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
- SetDataType(i - 10, &val);
- } else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
- }
- iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
- sum += val;
- }
- iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
- CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
- *aggregation_handle_avg_,
- *aggregation_handle_avg_state_);
+ // Test UNBOUNDED PRECEDING AND CURRENT ROW.
+ calculateAccumulative<GenericType, OutputType>(tuple_accessor,
+ argument_type_vector,
+ argument_cpp_vector);
}
template <typename GenericType>
- ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
- NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
-
- typename GenericType::cpptype val;
- SetDataType(0, sum);
+ ColumnVector *createArgumentGeneric(
+ std::vector<GenericType::cpptype*> *argument_cpp_vector) {
+ const GenericType &type = GenericType::Instance(true);
+ NativeColumnVector *column = new NativeColumnVector(type, kNumSamples);
column->appendTypedValue(type.makeNullValue());
for (int i = 0; i < kNumSamples; ++i) {
+ // Insert a NULL every kNullInterval tuples.
+ if (i % kNullInterval == 0) {
+ argument_cpp_vector->push_back(nullptr);
+ column->appendTypedValue(type.makeNullValue());
+ continue;
+ }
+
+ typename GenericType::cpptype val = new GenericType::cpptype;
+
if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
- SetDataType(i - 10, &val);
+ SetDataType(i - 10, val);
} else {
- SetDataType(static_cast<float>(i - 10)/10, &val);
- }
- column->appendTypedValue(type.makeValue(&val));
- *sum += val;
- // One NULL in the middle.
- if (i == kNumSamples/2) {
- column->appendTypedValue(type.makeNullValue());
+ SetDataType(static_cast<float>(i - 10) / 10, val);
}
+
+ column->appendTypedValue(type.makeValue(val));
+ argument_cpp_vector->push_back(val);
}
- column->appendTypedValue(type.makeNullValue());
return column;
}
- template <typename GenericType, typename OutputType = DoubleType>
- void checkAggregationAvgGenericColumnVector() {
- const GenericType &type = GenericType::Instance(true);
- initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
-
- typename GenericType::cpptype sum;
- SetDataType(0, &sum);
- std::vector<std::unique_ptr<ColumnVector>> column_vectors;
- column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
-
- std::unique_ptr<AggregationState> cv_state(
- aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
-
- // Test the state generated directly by accumulateColumnVectors(), and also
- // test after merging back.
- CheckAvgValue<typename OutputType::cpptype>(
- static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
- *aggregation_handle_avg_,
- *cv_state);
-
- aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
- CheckAvgValue<typename OutputType::cpptype>(
- static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
- *aggregation_handle_avg_,
- *aggregation_handle_avg_state_);
- }
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- template <typename GenericType, typename OutputType = DoubleType>
- void checkAggregationAvgGenericValueAccessor() {
- const GenericType &type = GenericType::Instance(true);
- initializeHandle(type);
- EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
-
+ template <typename GenericType, typename OutputType>
+ void calculateAccumulate(ValueAccessor *tuple_accessor,
+ ColumnVector *argument_type_vector,
+ const std::vector<GenericType::cpptype> &argument_cpp_vector) {
+ std::vector<ColumnVector*> arguments;
+ arguments.push_back(argument_type_vector);
+ // The partition key index is 0.
+ std::vector<attribute_id> partition_key(1, 0);
+
+ ColumnVector *result =
+ handle_avg_->calculate(tuple_accessor,
+ std::move(arguments),
+ partition_key,
+ true /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+
+ // Get the cpptype result.
+ std::vector<OutputType::cpptype*> result_cpp_vector;
+ bool is_null;
typename GenericType::cpptype sum;
- SetDataType(0, &sum);
- std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
- accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
+ int count;
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ // Start of new partition
+ if (i % kNumTuplesPerPartition == 0) {
+ is_null = false;
+ SetDataType(0, &sum);
+ count = 0;
+ }
- std::unique_ptr<AggregationState> va_state(
- aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
- std::vector<attribute_id>(1, 0)));
+ typename GenericType::cpptype *value = argument_cpp_vector[i];
+ if (value == nullptr) {
+ is_null = true;
+ }
- // Test the state generated directly by accumulateValueAccessor(), and also
- // test after merging back.
- CheckAvgValue<typename OutputType::cpptype>(
- static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
- *aggregation_handle_avg_,
- *va_state);
+ if (is_null) {
+ result_cpp_vector.push_back(nullptr);
+ } else {
+ sum += *value;
+ count++;
+ result_cpp_vector.push_back(static_cast<typename OutputType>(sum) / count);
+ }
+ }
- aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
- CheckAvgValue<typename OutputType::cpptype>(
- static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
- *aggregation_handle_avg_,
- *aggregation_handle_avg_state_);
+ CheckAvgValues(result_cpp_vector, result);
}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
- std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
- std::unique_ptr<StorageManager> storage_manager_;
- std::unique_ptr<CatalogRelation> relation_;
+ std::unique_ptr<WindowAggregationHandle> handle_avg_;
};
-const int AggregationHandleAvgTest::kNumSamples;
-
-template <>
-void AggregationHandleAvgTest::CheckAvgValue<double>(
- double expected,
- const AggregationHandle &handle,
- const AggregationState &state) {
- EXPECT_DOUBLE_EQ(expected, handle.finalize(state).getLiteral<double>());
-}
-
-template <>
-void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
- data->interval_ticks = value;
-}
-
-template <>
-void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
- data->months = value;
-}
-
-typedef AggregationHandleAvgTest AggregationHandleAvgDeathTest;
-
-TEST_F(AggregationHandleAvgTest, IntTypeTest) {
- checkAggregationAvgGeneric<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeTest) {
- checkAggregationAvgGeneric<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeTest) {
- checkAggregationAvgGeneric<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeTest) {
- checkAggregationAvgGeneric<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeTest) {
- checkAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) {
- checkAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
- checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
-}
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<IntType>();
-}
-
-TEST_F(AggregationHandleAvgTest, LongTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<LongType>();
-}
-
-TEST_F(AggregationHandleAvgTest, FloatTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<FloatType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<DoubleType>();
-}
-
-TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
-}
-
-TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
- checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
-}
-#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
-
-#ifdef QUICKSTEP_DEBUG
-TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) {
- const Type &type = CharType::Instance(true, 10);
- EXPECT_DEATH(initializeHandle(type), "");
-}
-
-TEST_F(AggregationHandleAvgDeathTest, VarTypeTest) {
- const Type &type = VarCharType::Instance(true, 10);
- EXPECT_DEATH(initializeHandle(type), "");
-}
-
-TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
- const Type &int_non_null_type = IntType::Instance(false);
- const Type &long_type = LongType::Instance(true);
- const Type &double_type = DoubleType::Instance(true);
- const Type &float_type = FloatType::Instance(true);
- const Type &char_type = CharType::Instance(true, 10);
- const Type &varchar_type = VarCharType::Instance(true, 10);
-
- initializeHandle(IntType::Instance(true));
- int int_val = 0;
- std::int64_t long_val = 0;
- double double_val = 0;
- float float_val = 0;
-
- iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
-
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
- EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
-
- // Test mergeStates() with incorrectly typed handles.
- std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
- AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
- std::vector<const Type*>(1, &double_type)));
- std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
- aggregation_handle_avg_double->createInitialState());
- static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
- static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
- double_type.makeValue(&double_val));
- EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
- aggregation_handle_avg_state_.get()),
- "");
-
- std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
- AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
- std::vector<const Type*>(1, &float_type)));
- std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
- aggregation_handle_avg_float->createInitialState());
- static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
- static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
- float_type.makeValue(&float_val));
- EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
- aggregation_handle_avg_state_.get()),
- "");
-}
-#endif
-
-TEST_F(AggregationHandleAvgTest, canApplyToTypeTest) {
- EXPECT_TRUE(ApplyToTypesTest(kInt));
- EXPECT_TRUE(ApplyToTypesTest(kLong));
- EXPECT_TRUE(ApplyToTypesTest(kFloat));
- EXPECT_TRUE(ApplyToTypesTest(kDouble));
- EXPECT_FALSE(ApplyToTypesTest(kChar));
- EXPECT_FALSE(ApplyToTypesTest(kVarChar));
- EXPECT_FALSE(ApplyToTypesTest(kDatetime));
- EXPECT_TRUE(ApplyToTypesTest(kDatetimeInterval));
- EXPECT_TRUE(ApplyToTypesTest(kYearMonthInterval));
-}
-
-TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kInt, kDouble));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
- EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
-}
-
-TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
- const Type &long_non_null_type = LongType::Instance(false);
- initializeHandle(long_non_null_type);
- storage_manager_.reset(new StorageManager("./test_avg_data"));
- std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
- aggregation_handle_avg_->createGroupByHashTable(
- HashTableImplType::kSimpleScalarSeparateChaining,
- std::vector<const Type *>(1, &long_non_null_type),
- 10,
- storage_manager_.get()));
- std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
- aggregation_handle_avg_->createGroupByHashTable(
- HashTableImplType::kSimpleScalarSeparateChaining,
- std::vector<const Type *>(1, &long_non_null_type),
- 10,
- storage_manager_.get()));
-
- AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
- static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
- destination_hash_table.get());
-
- AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
- static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
- source_hash_table.get());
-
- AggregationHandleAvg *aggregation_handle_avg_derived =
- static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
- // We create three keys: first is present in both the hash tables, second key
- // is present only in the source hash table while the third key is present
- // the destination hash table only.
- std::vector<TypedValue> common_key;
- common_key.emplace_back(static_cast<std::int64_t>(0));
- std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
- exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
- exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
-
- const std::int64_t common_key_source_avg = 355;
- TypedValue common_key_source_avg_val(common_key_source_avg);
-
- const std::int64_t common_key_destination_avg = 295;
- TypedValue common_key_destination_avg_val(common_key_destination_avg);
-
- const std::int64_t exclusive_key_source_avg = 1;
- TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
-
- const std::int64_t exclusive_key_destination_avg = 1;
- TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
-
- std::unique_ptr<AggregationStateAvg> common_key_source_state(
- static_cast<AggregationStateAvg *>(
- aggregation_handle_avg_->createInitialState()));
- std::unique_ptr<AggregationStateAvg> common_key_destination_state(
- static_cast<AggregationStateAvg *>(
- aggregation_handle_avg_->createInitialState()));
- std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
- static_cast<AggregationStateAvg *>(
- aggregation_handle_avg_->createInitialState()));
- std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
- static_cast<AggregationStateAvg *>(
- aggregation_handle_avg_->createInitialState()));
-
- // Create avg value states for keys.
- aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
- common_key_source_avg_val);
-
- aggregation_handle_avg_derived->iterateUnaryInl(
- common_key_destination_state.get(), common_key_destination_avg_val);
-
- aggregation_handle_avg_derived->iterateUnaryInl(
- exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
-
- aggregation_handle_avg_derived->iterateUnaryInl(
- exclusive_key_source_state.get(), exclusive_key_source_avg_val);
-
- // Add the key-state pairs to the hash tables.
- source_hash_table_derived->putCompositeKey(common_key,
- *common_key_source_state);
- destination_hash_table_derived->putCompositeKey(
- common_key, *common_key_destination_state);
- source_hash_table_derived->putCompositeKey(exclusive_source_key,
- *exclusive_key_source_state);
- destination_hash_table_derived->putCompositeKey(
- exclusive_destination_key, *exclusive_key_destination_state);
-
- EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
- EXPECT_EQ(2u, source_hash_table_derived->numEntries());
-
- aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
- destination_hash_table.get());
-
- EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
-
- CheckAvgValue<double>(
- (common_key_destination_avg_val.getLiteral<std::int64_t>() +
- common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
- *aggregation_handle_avg_derived,
- *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
- CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
- *aggregation_handle_avg_derived,
- *(destination_hash_table_derived->getSingleCompositeKey(
- exclusive_destination_key)));
- CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
- *aggregation_handle_avg_derived,
- *(source_hash_table_derived->getSingleCompositeKey(
- exclusive_source_key)));
-}
-
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f5c41d1/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index de8cfeb..ea522b8 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -280,15 +280,16 @@ void WindowAggregationOperationState::windowAggregateBlocks(
}
// Do actual calculation in handle.
- window_aggregation_handle_->calculate(all_blocks_accessor,
- std::move(argument_vecs),
- partition_by_ids_,
- is_row_,
- num_preceding_,
- num_following_);
-
- ValueAccessor* output_accessor = window_aggregation_handle_->finalize();
- output_destination->bulkInsertTuples(output_accessor);
+ ColumnVector *window_aggregates =
+ window_aggregation_handle_->calculate(all_blocks_accessor,
+ std::move(argument_vecs),
+ partition_by_ids_,
+ is_row_,
+ num_preceding_,
+ num_following_);
+
+ all_blocks_accessor->addColumn(window_aggregates);
+ output_destination->bulkInsertTuples(all_blocks_accessor);
}
} // namespace quickstep