You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/11 14:12:22 UTC
[3/3] incubator-quickstep git commit: Seperated calculation into two
parts to check intermediate result
Seperated calculation into two parts to check intermediate result
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d5f535ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d5f535ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d5f535ee
Branch: refs/heads/SQL-window-aggregation
Commit: d5f535ee9f0e3c7a8ea7615a07699b83c5f905f4
Parents: 2966405
Author: shixuan-fan <sh...@apache.org>
Authored: Mon Jul 11 14:11:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Mon Jul 11 14:11:47 2016 +0000
----------------------------------------------------------------------
.../WindowAggregateFunction.hpp | 4 +
.../WindowAggregateFunctionAvg.cpp | 6 +-
.../WindowAggregateFunctionAvg.hpp | 2 +
.../WindowAggregationHandle.hpp | 16 +-
.../WindowAggregationHandleAvg.cpp | 78 ++++++----
.../WindowAggregationHandleAvg.hpp | 11 +-
.../WindowAggregationHandleAvg_unittest.cpp | 148 ++++++++++++++++++-
query_optimizer/ExecutionGenerator.cpp | 5 +
query_optimizer/resolver/Resolver.cpp | 8 +-
.../WindowAggregationOperator.cpp | 3 +-
storage/WindowAggregationOperationState.cpp | 25 +++-
storage/WindowAggregationOperationState.hpp | 5 +-
storage/WindowAggregationOperationState.proto | 1 +
13 files changed, 253 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index 9cc5d74..84d97fc 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -26,10 +26,12 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
+class CatalogRelationSchema;
class WindowAggregationHandle;
class Type;
@@ -128,6 +130,8 @@ class WindowAggregateFunction {
* is responsible for deleting the returned object.
**/
virtual WindowAggregationHandle* createHandle(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const = 0;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index e9a4453..06ff1d9 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -72,13 +72,17 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
}
WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
<< " that AVG can not be applied to.";
- return new WindowAggregationHandleAvg(*argument_types.front(),
+ return new WindowAggregationHandleAvg(relation,
+ block_ids,
+ *argument_types.front(),
std::move(partition_key_types));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 18e1022..91acf7e 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -57,6 +57,8 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
const std::vector<const Type*> &argument_types) const override;
WindowAggregationHandle* createHandle(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 77b1e76..6b7988a 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -99,14 +99,13 @@ class WindowAggregationHandle {
* @param output_destination The destination for output.
**/
virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<block_id> &block_ids,
const std::vector<attribute_id> &partition_by_ids,
- const CatalogRelationSchema &relation,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
- StorageManager *storage_manager,
- InsertDestinationInterface *output_destination) const = 0;
+ StorageManager *storage_manager) = 0;
+
+ virtual std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager) = 0;
protected:
/**
@@ -119,7 +118,14 @@ class WindowAggregationHandle {
* @param num_following The number of rows/range that follows the current row.
* @param storage_manager A pointer to the storage manager.
**/
- WindowAggregationHandle() {}
+ WindowAggregationHandle(const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids)
+ : block_ids_(block_ids),
+ relation_(relation) {}
+
+ std::vector<ColumnVector*> window_aggregates_;
+ const std::vector<block_id> block_ids_;
+ const CatalogRelationSchema &relation_;
private:
DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 62f5a88..e6e2894 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -25,6 +25,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageManager.hpp"
@@ -51,9 +52,12 @@ namespace quickstep {
class StorageManager;
WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> &block_ids,
const Type &type,
std::vector<const Type*> &&partition_key_types)
- : argument_type_(type) {
+ : WindowAggregationHandle(relation, block_ids),
+ argument_type_(type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_id;
@@ -98,24 +102,20 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
}
void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<block_id> &block_ids,
const std::vector<attribute_id> &partition_by_ids,
- const CatalogRelationSchema &relation,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
- StorageManager *storage_manager,
- InsertDestinationInterface *output_destination) const {
+ StorageManager *storage_manager) {
DCHECK(arguments.size() == 1);
- DCHECK(!block_ids.empty());
// Initialize the tuple accessors and argument accessors.
// Index of each value accessor indicates the block it belongs to.
std::vector<ValueAccessor*> tuple_accessors;
std::vector<ColumnVectorsValueAccessor*> argument_accessors;
- for (block_id bid : block_ids) {
+ for (block_id bid : block_ids_) {
// Get tuple accessor.
- BlockReference block = storage_manager->getBlock(bid, relation);
+ BlockReference block = storage_manager->getBlock(bid, relation_);
const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
tuple_accessors.push_back(tuple_accessor);
@@ -132,12 +132,14 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
// Create a window for each tuple and calculate the window aggregate.
for (std::uint32_t current_block_index = 0;
- current_block_index < block_ids.size();
+ current_block_index < block_ids_.size();
++current_block_index) {
ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
ColumnVectorsValueAccessor* argument_accessor =
argument_accessors[current_block_index];
-
+ NativeColumnVector window_aggregates_for_block(*result_type_,
+ argument_accessor->getNumTuples());
+
InvokeOnAnyValueAccessor (
tuple_accessor,
[&] (auto *tuple_accessor) -> void {
@@ -145,24 +147,48 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
argument_accessor->beginIteration();
while (tuple_accessor->next() && argument_accessor->next()) {
- TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
- argument_accessors,
- partition_by_ids,
- current_block_index,
- is_row,
- num_preceding,
- num_following);
- Tuple *current_tuple = tuple_accessor->getTuple();
- std::vector<TypedValue> new_tuple;
- for (TypedValue value : *current_tuple) {
- new_tuple.push_back(value);
- }
-
- new_tuple.push_back(window_aggregate);
- output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+ const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+ argument_accessors,
+ partition_by_ids,
+ current_block_index,
+ is_row,
+ num_preceding,
+ num_following);
+ window_aggregates_for_block.appendTypedValue(window_aggregate);
}
});
+
+ window_aggregates_.push_back(&window_aggregates_for_block);
+ }
+}
+
+std::vector<ValueAccessor*>&& WindowAggregationHandleAvg::finalize(
+ StorageManager *storage_manager) {
+ std::vector<ValueAccessor*> accessors;
+
+ // Create a ValueAccessor for each block, including the new window aggregate
+ // attribute.
+ for (std::size_t block_idx = 0; block_idx < block_ids_.size(); ++block_idx) {
+ // Get the block information.
+ BlockReference block = storage_manager->getBlock(block_idx, relation_);
+ const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+ ValueAccessor *block_accessor = tuple_block.createValueAccessor();
+ SubBlocksReference sub_block_ref(tuple_block,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ ColumnVectorsValueAccessor accessor;
+
+ for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+ attr_it != relation_.end();
+ ++attr_it) {
+ ScalarAttribute scalar_attr(*attr_it);
+ accessor.addColumn(scalar_attr.getAllValues(block_accessor, &sub_block_ref));
+ }
+
+ accessors.push_back(&accessor);
}
+
+ return std::move(accessors);
}
TypedValue WindowAggregationHandleAvg::calculateOneWindow(
@@ -287,7 +313,7 @@ bool WindowAggregationHandleAvg::samePartition(
const std::vector<attribute_id> &partition_by_ids) const {
return InvokeOnAnyValueAccessor (tuple_accessor,
[&] (auto *tuple_accessor) -> bool {
- for (std::uint32_t partition_by_index = 0;
+ for (std::size_t partition_by_index = 0;
partition_by_index < partition_by_ids.size();
++partition_by_index) {
if (!equal_comparators_[partition_by_index]->compareTypedValues(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 115152e..e0ec766 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -56,14 +56,13 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
~WindowAggregationHandleAvg() override {}
void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<block_id> &block_ids,
const std::vector<attribute_id> &partition_by_ids,
- const CatalogRelationSchema &relation,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
- StorageManager *storage_manager,
- InsertDestinationInterface *output_destination) const;
+ StorageManager *storage_manager);
+
+ std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager);
private:
friend class WindowAggregateFunctionAvg;
@@ -79,7 +78,9 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
* @param storage_manager A pointer to the storage manager.
* @param type Type of the avg value.
**/
- explicit WindowAggregationHandleAvg(const Type &type,
+ explicit WindowAggregationHandleAvg(const CatalogRelationSchema &relation,
+ const std::vector<block_id> &block_ids,
+ const Type &type,
std::vector<const Type*> &&partition_key_types);
TypedValue calculateOneWindow(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index 6a7d161..326afa7 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -51,12 +51,142 @@ namespace quickstep {
namespace {
- constexpr int kNumSamples = 100;
+ constexpr int kNumTuplesPerBlock = 100;
+ constexpr int kNumBlocks = 5;
+ constexpr int kNumTuplesPerPartition = 8;
} // namespace
-class WindowAggregationHandleAvgTest : public::testing::Test {
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
protected:
+ virtual void SetUp() {
+ // Initialize relation and storage manager.
+ relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId));
+ storage_manager_.reset(new StorageManager("TestAvg"));
+
+ // Add All kinds of TypedValues.
+ CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(),
+ "int_attr",
+ TypeFactory::GetType(kInt, GetParam()));
+
+ relation_->addAttribute(int_attr);
+
+ CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(),
+ "float_attr",
+ TypeFactory::GetType(kFloat, GetParam()));
+ relation_->addAttribute(float_attr);
+
+ CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(),
+ "long_attr",
+ TypeFactory::GetType(kLong, GetParam()));
+ relation_->addAttribute(long_attr);
+
+ CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(),
+ "double_attr",
+ TypeFactory::GetType(kDouble, GetParam()));
+ relation_->addAttribute(double_attr);
+
+ CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(),
+ "char_attr",
+ TypeFactory::GetType(kChar, 4, GetParam()));
+ relation_->addAttribute(char_attr);
+
+ CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(),
+ "varchar_attr",
+ TypeFactory::GetType(kVarChar, 32, GetParam()));
+ relation_->addAttribute(varchar_attr);
+
+ // Records the 'base_value' of a tuple used in createSampleTuple.
+ CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(),
+ "partition_value",
+ TypeFactory::GetType(kInt, false));
+ relation_->addAttribute(partition_value);
+
+ StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true);
+
+ // Initialize blocks.
+ for (int i = 0; i < kNumBlocks; ++i) {
+ block_id bid = storage_manager_->createBlock(relation_, layout);
+ relation_->addBlock(bid);
+ insertTuples(bid);
+ }
+ }
+
+ // Insert kNumTuplesPerBlock tuples into the block.
+ void insertTuples(block_id bid) {
+ MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_);
+ for (int i = 0; i < kNumTuplesPerBlock; ++i) {
+ Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i);
+ block->insertTuple(*tuple);
+ }
+ }
+
+ Tuple* createTuple(int base_value) {
+ std::vector<TypedValue> attrs;
+
+ // int_attr.
+ if (GetParam() && base_value % 10 == 0) {
+ // Throw in a NULL integer for every ten values.
+ attrs.emplace_back(kInt);
+ } else {
+ attrs.emplace_back(base_value);
+ }
+
+ // float_attr.
+ if (GetParam() && base_value % 10 == 1) {
+ attrs.emplace_back(kFloat);
+ } else {
+ attrs.emplace_back(static_cast<float>(0.4 * base_value));
+ }
+
+ // long_attr.
+ if (GetParam() && base_value % 10 == 2) {
+ attrs.emplace_back(kLong);
+ } else {
+ attrs.emplace_back(static_cast<std::int64_t>(base_value));
+ }
+
+ // double_attr.
+ if (GetParam() && base_value % 10 == 3) {
+ attrs.emplace_back(kDouble);
+ } else {
+ attrs.emplace_back(static_cast<double>(0.25 * base_value));
+ }
+
+ // char_attr
+ if (GetParam() && base_value % 10 == 4) {
+ attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue());
+ } else {
+ std::ostringstream char_buffer;
+ char_buffer << base_value;
+ std::string string_literal(char_buffer.str());
+ attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue(
+ string_literal.c_str(),
+ string_literal.size() > 3 ? 4
+ : string_literal.size() + 1));
+ attrs.back().ensureNotReference();
+ }
+
+ // varchar_attr
+ if (GetParam() && base_value % 10 == 5) {
+ attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue());
+ } else {
+ std::ostringstream char_buffer;
+ char_buffer << "Here are some numbers: " << base_value;
+ std::string string_literal(char_buffer.str());
+ attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue(
+ string_literal.c_str(),
+ string_literal.size() + 1));
+ attrs.back().ensureNotReference();
+ }
+
+ // base_value
+ attrs.emplace_back(base_value / kNumTuplesPerPartition);
+ return new Tuple(std::move(attrs));
+}
+ }
+
// Handle initialization.
void initializeHandle(const Type &argument_type,
const std::vector<const Type*> &partition_key_types) {
@@ -86,11 +216,13 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
}
template <typename CppType>
- static void CheckAvgValue(
- CppType expected,
- const AggregationHandle &handle,
- const AggregationState &state) {
- EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+ static void CheckAvgValues(
+ std::vector<CppType> expected,
+ const ColumnVector* actual) {
+ EXPECT_EQ(expected.size(), actual->size());
+ for (std::size_t i = 0; i < expected.size(); ++i) {
+ EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+ }
}
// Static templated method for set a meaningful value to data types.
@@ -237,6 +369,8 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
std::unique_ptr<StorageManager> storage_manager_;
+ std::unique_ptr<CatalogRelation> relation_;
+ std::vector<block_id> block_ids_;
};
const int AggregationHandleAvgTest::kNumSamples;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..06d47d2 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1656,6 +1656,11 @@ void ExecutionGenerator::convertWindowAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
+ // Get relation blocks.
+ for (block_id bid : input_relation_info->relation->getBlocksSnapshot()) {
+ window_aggr_state_proto->add_block_ids(bid);
+ }
+
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
physical_plan->window_aggregate_expression();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index 11348fe..d28213c 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -2630,8 +2630,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
// TODO(Shixuan): We might want to create a new abstract class Function to
// include both AggregateFunction and WindowAggregateFunction, which will make
// this part of code cleaner.
- const ::quickstep::AggregateFunction *aggregate;
- const ::quickstep::WindowAggregateFunction *window_aggregate;
+ const ::quickstep::AggregateFunction *aggregate = nullptr;
+ const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
if (parse_function_call.isWindow()) {
window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
} else {
@@ -2670,7 +2670,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
|| (window_aggregate != nullptr && window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
if ((resolved_arguments.empty()) && !count_star) {
THROW_SQL_ERROR_AT(&parse_function_call)
- << "COUNT function requires an argument (either scalar or star (*))";
+ << "COUNT aggregate requires an argument (either scalar or star (*))";
}
}
@@ -2684,7 +2684,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
|| (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
THROW_SQL_ERROR_AT(&parse_function_call)
- << "Function " << aggregate->getName()
+ << "Aggregate function " << aggregate->getName()
<< " can not apply to the given argument(s).";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index ec2f27c..4c2e2b5 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -81,8 +81,7 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
void WindowAggregationWorkOrder::execute() {
- state_->windowAggregateBlocks(output_destination_,
- block_ids_);
+ state_->windowAggregateBlocks(output_destination_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 8d05b79..9029bd4 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -47,6 +47,7 @@ namespace quickstep {
WindowAggregationOperationState::WindowAggregationOperationState(
const CatalogRelationSchema &input_relation,
+ std::vector<block_id> &&block_ids,
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -55,6 +56,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
+ block_ids_(std::move(block_ids)),
arguments_(std::move(arguments)),
is_row_(is_row),
num_preceding_(num_preceding),
@@ -82,7 +84,9 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(std::move(argument_types),
+ window_aggregate_function->createHandle(input_relation_,
+ block_ids_,
+ std::move(argument_types),
std::move(partition_by_types)));
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -109,6 +113,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto, database));
+ std::vector<block_id> block_ids;
+ for (int block_idx = 0; block_idx < proto.block_ids_size(); ++block_idx) {
+ block_ids.push_back(proto.block_ids(block_idx));
+ }
+
// Rebuild contructor arguments from their representation in 'proto'.
const WindowAggregateFunction *window_aggregate_function
= &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
@@ -135,6 +144,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
const std::int64_t num_following = proto.num_following();
return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+ std::move(block_ids),
window_aggregate_function,
std::move(arguments),
std::move(partition_by_attributes),
@@ -185,17 +195,18 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
}
void WindowAggregationOperationState::windowAggregateBlocks(
- InsertDestination *output_destination,
- const std::vector<block_id> &block_ids) {
+ InsertDestination *output_destination) {
window_aggregation_handle_->calculate(arguments_,
- block_ids,
partition_by_ids_,
- input_relation_,
is_row_,
num_preceding_,
num_following_,
- storage_manager_,
- output_destination);
+ storage_manager_);
+ std::vector<ValueAccessor*> output_accessors(
+ window_aggregation_handle_->finalize(storage_manager_));
+ for (ValueAccessor* output_accessor : output_accessors) {
+ output_destination->bulkInsertTuples(output_accessor);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index f38dbd8..8116410 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -67,6 +67,7 @@ class WindowAggregationOperationState {
* tables.
*/
WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+ std::vector<block_id> &&block_ids,
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -112,11 +113,11 @@ class WindowAggregationOperationState {
* @param output_destination The output destination for the computed window
* aggregate.
**/
- void windowAggregateBlocks(InsertDestination *output_destination,
- const std::vector<block_id> &block_ids);
+ void windowAggregateBlocks(InsertDestination *output_destination);
private:
const CatalogRelationSchema &input_relation_;
+ const std::vector<block_id> block_ids_;
// TODO(Shixuan): Handle and State for window aggregation will be needed for
// actual calculation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index 4dc0a6a..c3b672c 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -24,6 +24,7 @@ import "expressions/Expressions.proto";
message WindowAggregationOperationState {
required int32 input_relation_id = 1;
+ repeated fixed64 block_ids = 2;
required WindowAggregateFunction function = 3;
repeated Scalar arguments = 4;
repeated Scalar partition_by_attributes = 5;