You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/08/03 10:56:10 UTC
[2/7] incubator-quickstep git commit: RANGE mode and computation
optimization. - Supported RANGE mode for window aggregation. - Optimized the
AVG calculation time complexity from O(nk) to O(n),
where n is the number of tuples and k is the window size.
RANGE mode and computation optimization.
- Supported RANGE mode for window aggregation.
- Optimized the AVG calculation time complexity from O(nk) to O(n), where n is the number of tuples and k is the window size.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0172fde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0172fde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0172fde
Branch: refs/heads/execution-dag-visualizer
Commit: d0172fde0cffbf10bc858090e11169e11834be89
Parents: e53186e
Author: shixuan <sh...@apache.org>
Authored: Tue Jul 26 11:49:07 2016 -0500
Committer: shixuan <sh...@apache.org>
Committed: Mon Aug 1 09:38:34 2016 -0500
----------------------------------------------------------------------
expressions/window_aggregation/CMakeLists.txt | 15 +-
.../WindowAggregateFunction.hpp | 19 +-
.../WindowAggregateFunctionAvg.cpp | 14 +-
.../WindowAggregateFunctionAvg.hpp | 6 +-
.../WindowAggregateFunctionCount.cpp | 6 +-
.../WindowAggregateFunctionCount.hpp | 6 +-
.../WindowAggregateFunctionMax.cpp | 6 +-
.../WindowAggregateFunctionMax.hpp | 6 +-
.../WindowAggregateFunctionMin.cpp | 6 +-
.../WindowAggregateFunctionMin.hpp | 6 +-
.../WindowAggregateFunctionSum.cpp | 6 +-
.../WindowAggregateFunctionSum.hpp | 6 +-
.../WindowAggregationHandle.cpp | 186 ++++++++++++++++
.../WindowAggregationHandle.hpp | 100 ++++++---
.../WindowAggregationHandleAvg.cpp | 201 ++++++-----------
.../WindowAggregationHandleAvg.hpp | 35 ++-
.../WindowAggregationHandleAvg_unittest.cpp | 220 +++++++++++++++----
query_optimizer/ExecutionGenerator.cpp | 11 +-
query_optimizer/resolver/Resolver.cpp | 19 +-
.../tests/execution_generator/Select.test | 41 +++-
storage/WindowAggregationOperationState.cpp | 69 +++---
storage/WindowAggregationOperationState.hpp | 9 +-
storage/WindowAggregationOperationState.proto | 1 +
23 files changed, 692 insertions(+), 302 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
index 6a16fcc..3a79b7e 100644
--- a/expressions/window_aggregation/CMakeLists.txt
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -44,7 +44,7 @@ add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionSum
WindowAggregateFunctionSum.cpp
WindowAggregateFunctionSum.hpp)
add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
- ../../empty_src.cpp
+ WindowAggregationHandle.cpp
WindowAggregationHandle.hpp)
add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
WindowAggregationHandleAvg.cpp
@@ -130,10 +130,17 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
glog
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_scalar_Scalar
quickstep_storage_StorageBlockInfo
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
quickstep_types_TypedValue
quickstep_types_containers_ColumnVector
quickstep_types_containers_ColumnVectorsValueAccessor
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
quickstep_types_operations_comparisons_Comparison
quickstep_types_operations_comparisons_ComparisonFactory
quickstep_types_operations_comparisons_ComparisonID
@@ -141,8 +148,6 @@ target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationH
target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
glog
quickstep_catalog_CatalogTypedefs
- quickstep_expressions_scalar_Scalar
- quickstep_expressions_scalar_ScalarAttribute
quickstep_expressions_windowaggregation_WindowAggregationHandle
quickstep_storage_ValueAccessor
quickstep_types_Type
@@ -179,11 +184,13 @@ add_executable(WindowAggregationHandle_tests
target_link_libraries(WindowAggregationHandle_tests
gtest
gtest_main
+ quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_scalar_Scalar
+ quickstep_expressions_scalar_ScalarAttribute
quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
quickstep_expressions_windowaggregation_WindowAggregationHandle
- quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
quickstep_expressions_windowaggregation_WindowAggregationID
quickstep_storage_ValueAccessor
quickstep_types_CharType
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index e40479b..7ffc4ae 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -20,6 +20,7 @@
#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#include <memory>
#include <string>
#include <vector>
@@ -32,6 +33,7 @@
namespace quickstep {
class CatalogRelationSchema;
+class Scalar;
class Type;
class WindowAggregationHandle;
@@ -120,16 +122,23 @@ class WindowAggregateFunction {
*
* @param argument_types A list of zero or more Types (in order) for
* arguments to this WindowAggregateFunction.
- * @param partition_key_types A list or zero or more Types for partition keys
- * to this WindowAggregateFunction.
+ * @param partition_by_attributes A list of attributes used as partition key.
+ * @param order_by_attributes A list of attributes used as order key.
+ * @param is_row True if the frame mode is ROWS, false if RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
*
* @return A new WindowAggregationHandle that can be used to compute this
- * WindowAggregateFunction over the specified argument_types. Caller
- * is responsible for deleting the returned object.
+ * WindowAggregateFunction over the specified window definition.
+ * Caller is responsible for deleting the returned object.
**/
virtual WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const = 0;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const = 0;
protected:
explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index bc31a53..beb1c7a 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -73,13 +73,21 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
<< " that AVG can not be applied to.";
- return new WindowAggregationHandleAvg(partition_key_types,
- *argument_types.front());
+ return new WindowAggregationHandleAvg(partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following,
+ argument_types[0]);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 32fd9d5..0e50415 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionAvg()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
index 504e000..ccd81ac 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.cpp
@@ -47,7 +47,11 @@ const Type* WindowAggregateFunctionCount::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionCount::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleCount for argument Types "
<< "that COUNT can not be applied to (> 1 argument).";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
index 1b40fdd..2e5506a 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionCount.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionCount : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionCount()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
index f3997c7..acfce82 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.cpp
@@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMax::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionMax::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleMax for argument Type(s) "
<< "that MAX can not be applied to.";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
index 00c788e..a215703 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMax.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionMax : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionMax()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
index a13e28e..cd845bd 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.cpp
@@ -55,7 +55,11 @@ const Type* WindowAggregateFunctionMin::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionMin::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleMin for argument Type(s) "
<< "that MIN can not be applied to.";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
index aeba539..fab88a8 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionMin.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionMin : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionMin()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
index 636c53a..e2aeb60 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.cpp
@@ -71,7 +71,11 @@ const Type* WindowAggregateFunctionSum::resultTypeForArgumentTypes(
WindowAggregationHandle* WindowAggregateFunctionSum::createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create a WindowAggregationHandleSum for argument Type(s) "
<< "that SUM can not be applied to.";
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
index 047113c..8d7d61d 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionSum.hpp
@@ -58,7 +58,11 @@ class WindowAggregateFunctionSum : public WindowAggregateFunction {
WindowAggregationHandle* createHandle(
const std::vector<const Type*> &argument_types,
- const std::vector<const Type*> &partition_key_types) const override;
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const override;
private:
WindowAggregateFunctionSum()
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.cpp b/expressions/window_aggregation/WindowAggregationHandle.cpp
new file mode 100644
index 0000000..835eaff
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.cpp
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+WindowAggregationHandle::WindowAggregationHandle(
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following)
+ : is_row_(is_row),
+ num_preceding_(num_preceding),
+ num_following_(num_following) {
+ // IDs and types of partition keys.
+ std::vector<const Type*> partition_key_types;
+ for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+ partition_key_ids_.push_back(
+ partition_by_attribute->getAttributeIdForValueAccessor());
+ partition_key_types.push_back(&partition_by_attribute->getType());
+ }
+
+ // Comparison operators for checking if two tuples belong to the same partition.
+ for (const Type *partition_key_type : partition_key_types) {
+ partition_equal_comparators_.emplace_back(
+ ComparisonFactory::GetComparison(ComparisonID::kEqual)
+ .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+ }
+
+ // IDs and types of order keys.
+ const Type *first_order_key_type = nullptr;
+ for (const std::unique_ptr<const Scalar> &order_by_attribute : order_by_attributes) {
+ order_key_ids_.push_back(
+ order_by_attribute->getAttributeIdForValueAccessor());
+ if (first_order_key_type == nullptr) {
+ first_order_key_type = &order_by_attribute->getType();
+ }
+ }
+
+ // ID and type of the first order key if in RANGE mode.
+ if (!is_row) {
+ DCHECK(first_order_key_type != nullptr);
+
+ // Comparators and operators to check window frame in RANGE mode.
+ const Type &long_type = TypeFactory::GetType(kLong, false);
+ range_compare_type_ =
+ TypeFactory::GetUnifyingType(*first_order_key_type, long_type);
+
+ range_add_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(*first_order_key_type, long_type));
+ range_comparator_.reset(
+ ComparisonFactory::GetComparison(ComparisonID::kLessOrEqual)
+ .makeUncheckedComparatorForTypes(*range_compare_type_, *range_compare_type_));
+ }
+}
+
+bool WindowAggregationHandle::samePartition(
+ const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const {
+ // If test tuple does not exist.
+ if (test_tuple_id < 0 ||
+ test_tuple_id >= tuple_accessor->getNumTuples()) {
+ return false;
+ }
+
+ // Check all partition by attributes.
+ for (std::size_t partition_by_index = 0;
+ partition_by_index < partition_key_ids_.size();
+ ++partition_by_index) {
+ if (!partition_equal_comparators_[partition_by_index]->compareTypedValues(
+ tuple_accessor->getTypedValue(partition_key_ids_[partition_by_index]),
+ tuple_accessor->getTypedValueAtAbsolutePosition(
+ partition_key_ids_[partition_by_index], test_tuple_id))) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool WindowAggregationHandle::inWindow(
+ const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const {
+ // If test tuple does not exist.
+ if (!samePartition(tuple_accessor, test_tuple_id)) {
+ return false;
+ }
+
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+ // If test tuple is the current tuple, then it is in the window.
+ if (test_tuple_id == current_tuple_id) {
+ return true;
+ }
+
+ // In ROWS mode, check the difference of tuple_id.
+ if (is_row_) {
+ if (num_preceding_ != -1 &&
+ test_tuple_id < current_tuple_id - num_preceding_) {
+ return false;
+ }
+
+ if (num_following_ != -1 &&
+ test_tuple_id > current_tuple_id + num_following_) {
+ return false;
+ }
+ } else {
+ // In RANGE mode, check the difference of first order key value.
+ // Get the test value.
+ const Type &long_type = TypeFactory::GetType(kLong, false);
+ TypedValue test_value =
+ range_add_operator_->applyToTypedValues(
+ tuple_accessor->getTypedValueAtAbsolutePosition(order_key_ids_[0], test_tuple_id),
+ long_type.makeZeroValue());
+
+ // NULL will be considered not in range.
+ if (test_value.isNull() ||
+ tuple_accessor->getTypedValue(order_key_ids_[0]).isNull()) {
+ return false;
+ }
+
+ // Get the boundary value if it is not UNBOUNDED.
+ if (num_preceding_ > -1) {
+ // num_preceding needs to be negated for calculation.
+ std::int64_t neg_num_preceding = -num_preceding_;
+ TypedValue start_boundary_value =
+ range_add_operator_->applyToTypedValues(
+ tuple_accessor->getTypedValue(order_key_ids_[0]),
+ long_type.makeValue(&neg_num_preceding));
+ if (!range_comparator_->compareTypedValues(start_boundary_value, test_value)) {
+ return false;
+ }
+ }
+
+ if (num_following_ > -1) {
+ TypedValue end_boundary_value =
+ range_add_operator_->applyToTypedValues(
+ tuple_accessor->getTypedValue(order_key_ids_[0]),
+ long_type.makeValue(&num_following_));
+ if (!range_comparator_->compareTypedValues(test_value, end_boundary_value)) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 65f95d9..41d1d96 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -27,19 +27,23 @@
#include "catalog/CatalogRelationSchema.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "storage/StorageBlockInfo.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "types/operations/comparisons/ComparisonFactory.hpp"
#include "types/operations/comparisons/ComparisonID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
-class InsertDestinationInterface;
class Scalar;
-class StorageManager;
class Type;
class ValueAccessor;
@@ -55,27 +59,29 @@ class ValueAccessor;
*
* A WindowAggregationHandle is created by calling
* WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
- * provides methods that are used to actually compute the window aggregate,
- * storing intermediate results in WindowAggregationState objects.
+ * provides methods that are used to actually compute the window aggregate.
*
* The work flow for computing a window aggregate is:
- * 1. Create an initial state by createInitialState().
- * 2. One thread will handle all the computation, iterating from the first
+ * 1. One thread will handle all the computation, iterating from the first
* tuple to the last tuple. Note there will be two modes that could be
* used upon different situations:
* a. If the window aggregate is defined as accumulative, which are:
* i. Functions applied to whole partition, such as rank(), ntile()
- * and dense_rank().
+ * and dense_rank(). (Not implemented yet).
* ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
* AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
* FOLLOWING".
* Then, for functions except median, we could store some global
- * values in the state without keeping all the tuple values around.
+ * values without keeping all the tuple values around. For simplicity,
+ * in avg(), count() and sum(), we treat the accumulative one as
+ * sliding window since the time complexity does not vary.
* b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
- * 3 FOLLOWING", we have to store all the tuples in the state so that
+ * 3 FOLLOWING", we have to store all the tuples in the state (at
+ * least two pointers to the start tuple and end tuple), so that
* we could know which values should be dropped as the window slides.
- * For each computed value, generate a tuple store in the column vector.
- * 3. Insert the new column into the original relation and return.
+ * For each computed value, generate a TypedValue and store it into a
+ * ColumnVector for window aggregate values.
+ * 2. Return the result ColumnVector.
*
* TODO(Shixuan): Currently we don't support parallelization. The basic idea for
* parallelization is to calculate the partial result inside each block. Each
@@ -96,37 +102,67 @@ class WindowAggregationHandle {
*
* @param block_accessors A pointer to the value accessor of block attributes.
* @param arguments The ColumnVectors of arguments
- * @param partition_by_ids The ids of partition keys.
- * @param is_row True if the frame mode is ROWS, false if it is RANGE.
- * @param num_preceding The number of rows/range that precedes the current row.
- * @param num_following The number of rows/range that follows the current row.
*
* @return A ColumnVector of the calculated window aggregates.
**/
virtual ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const = 0;
+ const std::vector<ColumnVector*> &arguments) const = 0;
protected:
/**
* @brief Constructor.
*
- * @param partition_key_types The Types of the partition key.
+ * @param partition_by_attributes A list of attributes used as partition key.
+ * @param order_by_attributes A list of attributes used as order key.
+ * @param is_row True if the frame mode is ROWS, false if RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ **/
+ WindowAggregationHandle(
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following);
+
+ /**
+ * @brief Check if test tuple is in the same partition as the current
+ * tuple in the accessor.
+ *
+ * @param tuple_accessor The ValueAccessor for tuples.
+ * @param test_tuple_id The id of the test tuple.
+ *
+ * @return True if test tuple is in the same partition as the current tuple in
+ * the accessor, false if not.
**/
- explicit WindowAggregationHandle(
- const std::vector<const Type*> &partition_key_types) {
- // Comparison operators for checking if two tuples belong to the same partition.
- for (const Type *partition_key_type : partition_key_types) {
- equal_comparators_.emplace_back(
- ComparisonFactory::GetComparison(ComparisonID::kEqual)
- .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
- }
- }
-
- std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+ bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const;
+
+ /**
+ * @brief Check if test tuple is in the defined range.
+ *
+ * @param tuple_accessor The ValueAccessor for tuples.
+ * @param test_tuple_id The id of the test tuple.
+ *
+ * @return True if test tuple is in the defined window, false if not.
+ **/
+ bool inWindow(const ColumnVectorsValueAccessor *tuple_accessor,
+ const tuple_id test_tuple_id) const;
+
+ // IDs and comparators for partition keys.
+ std::vector<attribute_id> partition_key_ids_;
+ std::vector<std::unique_ptr<UncheckedComparator>> partition_equal_comparators_;
+
+ // IDs, type, Comparator and operator for frame boundary check in RANGE mode.
+ std::vector<attribute_id> order_key_ids_;
+ std::unique_ptr<UncheckedBinaryOperator> range_add_operator_;
+ std::unique_ptr<UncheckedComparator> range_comparator_; // Less than or Equal
+ const Type* range_compare_type_;
+
+ // Window frame information.
+ const bool is_row_;
+ const std::int64_t num_preceding_;
+ const std::int64_t num_following_;
private:
DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index a6a10d4..e6a4b3f 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -24,8 +24,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/scalar/Scalar.hpp"
-#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
@@ -42,14 +41,21 @@
namespace quickstep {
WindowAggregationHandleAvg::WindowAggregationHandleAvg(
- const std::vector<const Type*> &partition_key_types,
- const Type &type)
- : WindowAggregationHandle(partition_key_types),
- argument_type_(type) {
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ const Type *argument_type)
+ : WindowAggregationHandle(partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_id;
- switch (type.getTypeID()) {
+ switch (argument_type->getTypeID()) {
case kInt:
case kLong:
type_id = kLong;
@@ -59,7 +65,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
type_id = kDouble;
break;
default:
- type_id = type.getTypeID();
+ type_id = argument_type->getTypeID();
break;
}
@@ -76,7 +82,13 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
// Add operator for summing argument values.
fast_add_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
- .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
+
+ // Subtract operator for dropping argument values off the window.
+ fast_subtract_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kSubtract)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
+
// Divide operator for dividing sum by count to get final average.
divide_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
@@ -85,11 +97,7 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
ColumnVector* WindowAggregationHandleAvg::calculate(
ColumnVectorsValueAccessor *tuple_accessor,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const {
+ const std::vector<ColumnVector*> &arguments) const {
DCHECK_EQ(1u, arguments.size());
DCHECK(arguments[0]->isNative());
DCHECK_EQ(static_cast<std::size_t>(tuple_accessor->getNumTuples()),
@@ -98,144 +106,69 @@ ColumnVector* WindowAggregationHandleAvg::calculate(
// Initialize the output column and argument accessor.
NativeColumnVector *window_aggregates =
new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
- ColumnVectorsValueAccessor* argument_accessor = new ColumnVectorsValueAccessor();
+ ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
argument_accessor->addColumn(arguments[0]);
+ // Initialize the information about the window.
+ TypedValue sum = sum_type_->makeZeroValue();
+ std::uint64_t count = 0;
+ tuple_id start_tuple_id = 0; // The id of the first tuple in the window.
+ tuple_id end_tuple_id = 0; // The id of the tuple that just passed the last
+ // tuple in the window.
+
// Create a window for each tuple and calculate the window aggregate.
tuple_accessor->beginIteration();
argument_accessor->beginIteration();
while (tuple_accessor->next() && argument_accessor->next()) {
- const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessor,
- argument_accessor,
- partition_by_ids,
- is_row,
- num_preceding,
- num_following);
- window_aggregates->appendTypedValue(window_aggregate);
- }
-
- return window_aggregates;
-}
-
-TypedValue WindowAggregationHandleAvg::calculateOneWindow(
- ColumnVectorsValueAccessor *tuple_accessor,
- ColumnVectorsValueAccessor *argument_accessor,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const {
- // Initialize.
- TypedValue sum = sum_type_->makeZeroValue();
- TypedValue current_value = argument_accessor->getTypedValue(0);
- std::uint64_t count = 0;
-
- // Ignore the value if null.
- if (!current_value.isNull()) {
- sum = fast_add_operator_->applyToTypedValues(sum, current_value);
- count++;
- }
-
- // Get the partition key for the current row.
- std::vector<TypedValue> current_row_partition_key;
- for (attribute_id partition_by_id : partition_by_ids) {
- current_row_partition_key.push_back(
- tuple_accessor->getTypedValue(partition_by_id));
- }
-
- // Get current position.
- tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
-
- // Find preceding tuples.
- int count_preceding = 0;
- tuple_id preceding_tuple_id = current_tuple_id;
- while (num_preceding == -1 || count_preceding < num_preceding) {
- preceding_tuple_id--;
-
- // No more preceding tuples.
- if (preceding_tuple_id < 0) {
- break;
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
+
+ // If current tuple is not in the same partition as the previous tuple,
+ // reset the window.
+ if (!samePartition(tuple_accessor, current_tuple_id - 1)) {
+ start_tuple_id = current_tuple_id;
+ end_tuple_id = current_tuple_id;
+ count = 0;
+ sum = sum_type_->makeZeroValue();
}
- // Get the partition keys and compare. If not the same partition as the
- // current row, stop searching preceding tuples.
- if (!samePartition(tuple_accessor,
- current_row_partition_key,
- preceding_tuple_id,
- partition_by_ids)) {
- break;
+ // Drop tuples that will be out of the window from the beginning.
+ while (!inWindow(tuple_accessor, start_tuple_id)) {
+ TypedValue start_value =
+ argument_accessor->getTypedValueAtAbsolutePosition(0, start_tuple_id);
+ // Ignore the value if NULL.
+ if (!start_value.isNull()) {
+ sum = fast_subtract_operator_->applyToTypedValues(sum, start_value);
+ count--;
+ }
+
+ start_tuple_id++;
}
- // Actually count the element and do the calculation.
- count_preceding++;
- TypedValue preceding_value =
- argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id);
+ // Add tuples that will be included by the window at the end.
+ while (inWindow(tuple_accessor, end_tuple_id)) {
+ TypedValue end_value =
+ argument_accessor->getTypedValueAtAbsolutePosition(0, end_tuple_id);
- // Ignore the value if null.
- if (!preceding_value.isNull()) {
- sum = fast_add_operator_->applyToTypedValues(sum, preceding_value);
- count++;
- }
- }
-
- // Find following tuples.
- int count_following = 0;
- tuple_id following_tuple_id = current_tuple_id;
- while (num_following == -1 || count_following < num_following) {
- following_tuple_id++;
+ // Ignore the value if NULL.
+ if (!end_value.isNull()) {
+ sum = fast_add_operator_->applyToTypedValues(sum, end_value);
+ count++;
+ }
- // No more following tuples.
- if (following_tuple_id == tuple_accessor->getNumTuples()) {
- break;
+ end_tuple_id++;
}
- // Get the partition keys and compare. If not the same partition as the
- // current row, stop searching preceding tuples.
- if (!samePartition(tuple_accessor,
- current_row_partition_key,
- following_tuple_id,
- partition_by_ids)) {
- break;
+ // If all values are NULLs, return NULL; Otherwise, return the quotient.
+ if (count == 0) {
+ window_aggregates->appendTypedValue(result_type_->makeNullValue());
+ } else {
+ window_aggregates->appendTypedValue(
+ divide_operator_->applyToTypedValues(sum, TypedValue(static_cast<double>(count))));
}
-
- // Actually count the element and do the calculation.
- count_following++;
- TypedValue following_value =
- argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id);
-
- // Ignore the value if null.
- if (!following_value.isNull()) {
- sum = fast_add_operator_->applyToTypedValues(sum, following_value);
- count++;
- }
- }
-
- // If all values are NULLs, return NULL; Otherwise, return the quotient.
- if (count == 0) {
- return result_type_->makeNullValue();
- } else {
- return divide_operator_->applyToTypedValues(sum,
- TypedValue(static_cast<double>(count)));
}
-}
-bool WindowAggregationHandleAvg::samePartition(
- const ColumnVectorsValueAccessor *tuple_accessor,
- const std::vector<TypedValue> ¤t_row_partition_key,
- const tuple_id boundary_tuple_id,
- const std::vector<attribute_id> &partition_by_ids) const {
- for (std::size_t partition_by_index = 0;
- partition_by_index < partition_by_ids.size();
- ++partition_by_index) {
- if (!equal_comparators_[partition_by_index]->compareTypedValues(
- current_row_partition_key[partition_by_index],
- tuple_accessor->getTypedValueAtAbsolutePosition(
- partition_by_ids[partition_by_index], boundary_tuple_id))) {
- return false;
- }
- }
-
- return true;
+ return window_aggregates;
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 5b41779..f7f2e4d 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -31,7 +31,6 @@
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/operations/binary_operations/BinaryOperation.hpp"
-#include "types/operations/comparisons/Comparison.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
@@ -54,11 +53,7 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
~WindowAggregationHandleAvg() override {}
ColumnVector* calculate(ColumnVectorsValueAccessor* block_accessors,
- std::vector<ColumnVector*> &&arguments,
- const std::vector<attribute_id> &partition_by_ids,
- const bool is_row,
- const std::int64_t num_preceding,
- const std::int64_t num_following) const override;
+ const std::vector<ColumnVector*> &arguments) const override;
private:
friend class WindowAggregateFunctionAvg;
@@ -66,29 +61,25 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
/**
* @brief Constructor.
*
- * @param partition_key_types The Types of the partition key.
- * @param type Type of the avg value.
+ * @param partition_by_attributes A list of attributes used as partition key.
+ * @param order_by_attributes A list of attributes used as order key.
+ * @param is_row True if the frame mode is ROWS, false if RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ * @param argument_type Type of the argument.
**/
- WindowAggregationHandleAvg(const std::vector<const Type*> &partition_key_types,
- const Type &type);
-
- TypedValue calculateOneWindow(
- ColumnVectorsValueAccessor *tuple_accessor,
- ColumnVectorsValueAccessor *argument_accessor,
- const std::vector<attribute_id> &partition_by_ids,
+ WindowAggregationHandleAvg(
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
- const std::int64_t num_following) const;
-
- bool samePartition(const ColumnVectorsValueAccessor *tuple_accessor,
- const std::vector<TypedValue> ¤t_row_partition_key,
- const tuple_id boundary_tuple_id,
- const std::vector<attribute_id> &partition_by_ids) const;
+ const std::int64_t num_following,
+ const Type *argument_type);
- const Type &argument_type_;
const Type *sum_type_;
const Type *result_type_;
std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+ std::unique_ptr<UncheckedBinaryOperator> fast_subtract_operator_;
std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index c044a98..cb58083 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -23,11 +23,13 @@
#include <memory>
#include <vector>
+#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
-#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
#include "expressions/window_aggregation/WindowAggregationID.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/CharType.hpp"
@@ -58,6 +60,9 @@ namespace {
constexpr int kNullInterval = 25;
constexpr int kNumPreceding = 2;
constexpr int kNumFollowing = 2;
+ constexpr int kPartitionKeyIndex = 0;
+ constexpr int kOrderKeyIndex = 1;
+ constexpr int kNumTuplesPerOrderKey = 2;
} // namespace
@@ -65,12 +70,27 @@ namespace {
class WindowAggregationHandleAvgTest : public::testing::Test {
protected:
// Handle initialization.
- void initializeHandle(const Type &argument_type) {
+ WindowAggregationHandle* initializeHandle(const Type &argument_type,
+ const bool is_row = true,
+ const std::int64_t num_preceding = -1,
+ const std::int64_t num_following = 0) {
const WindowAggregateFunction &function =
WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+ const Type &int_type = TypeFactory::GetType(kInt, false);
+ std::vector<std::unique_ptr<const Scalar>> partition_by_attributes;
+ std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+ partition_by_attributes.emplace_back(
+ new ScalarAttribute(CatalogAttribute(nullptr, "partition_key", int_type, kPartitionKeyIndex)));
+ order_by_attributes.emplace_back(
+ new ScalarAttribute(CatalogAttribute(nullptr, "order_key", int_type, kOrderKeyIndex)));
std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
- handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
- std::move(partition_key_types)));
+
+ return function.createHandle(std::vector<const Type*>(1, &argument_type),
+ partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following);
}
// Test canApplyToTypes().
@@ -117,24 +137,25 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
template <typename GenericType, typename OutputType = DoubleType>
void checkWindowAggregationAvgGeneric() {
- const GenericType &type = GenericType::Instance(true);
- initializeHandle(type);
-
// Create argument, partition key and cpptype vectors.
std::vector<typename GenericType::cpptype*> argument_cpp_vector;
argument_cpp_vector.reserve(kNumTuples);
ColumnVector *argument_type_vector =
createArgumentGeneric<GenericType>(&argument_cpp_vector);
NativeColumnVector *partition_key_vector =
- new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
+ NativeColumnVector *order_key_vector =
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples);
for (int i = 0; i < kNumTuples; ++i) {
partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+ order_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerOrderKey));
}
// Create tuple ValueAccessor.
ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
tuple_accessor->addColumn(partition_key_vector);
+ tuple_accessor->addColumn(order_key_vector);
tuple_accessor->addColumn(argument_type_vector);
// Test UNBOUNDED PRECEDING AND CURRENT ROW.
@@ -182,45 +203,95 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
std::vector<ColumnVector*> arguments;
arguments.push_back(argument_type_vector);
- // The partition key index is 0.
- std::vector<attribute_id> partition_key(1, 0);
- ColumnVector *result =
- handle_avg_->calculate(tuple_accessor,
- std::move(arguments),
- partition_key,
- true /* is_row */,
- -1 /* num_preceding: UNBOUNDED PRECEDING */,
- 0 /* num_following: CURRENT ROW */);
+ // Check ROWS mode.
+ WindowAggregationHandle *rows_handle =
+ initializeHandle(GenericType::Instance(true),
+ true /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+ ColumnVector *rows_result =
+ rows_handle->calculate(tuple_accessor, arguments);
// Get the cpptype result.
- std::vector<typename OutputType::cpptype*> result_cpp_vector;
- typename GenericType::cpptype sum;
- int count;
+ std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
+ typename GenericType::cpptype rows_sum;
+ int rows_count;
for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
// Start of new partition
if (i % kNumTuplesPerPartition == 0) {
- SetDataType(0, &sum);
- count = 0;
+ SetDataType(0, &rows_sum);
+ rows_count = 0;
}
typename GenericType::cpptype *value = argument_cpp_vector[i];
if (value != nullptr) {
- sum += *value;
- count++;
+ rows_sum += *value;
+ rows_count++;
}
- if (count == 0) {
- result_cpp_vector.push_back(nullptr);
+ if (rows_count == 0) {
+ rows_result_cpp_vector.push_back(nullptr);
} else {
typename OutputType::cpptype *result_cpp_value =
new typename OutputType::cpptype;
- *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
- result_cpp_vector.push_back(result_cpp_value);
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(rows_sum) / rows_count;
+ rows_result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
+
+ CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+ // Check RANGE mode.
+ WindowAggregationHandle *range_handle =
+ initializeHandle(GenericType::Instance(true),
+ false /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+ ColumnVector *range_result =
+ range_handle->calculate(tuple_accessor, arguments);
+
+ // Get the cpptype result.
+ std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+ typename GenericType::cpptype range_sum;
+ int range_count;
+ std::size_t current_tuple = 0;
+ while (current_tuple < kNumTuples) {
+ // Start of new partition
+ if (current_tuple % kNumTuplesPerPartition == 0) {
+ SetDataType(0, &range_sum);
+ range_count = 0;
+ }
+
+ // We have to consider following tuples with the same order key value.
+ std::size_t next_tuple = current_tuple;
+ while (next_tuple < kNumTuples &&
+ next_tuple / kNumTuplesPerPartition == current_tuple / kNumTuplesPerPartition &&
+ next_tuple / kNumTuplesPerOrderKey == current_tuple / kNumTuplesPerOrderKey) {
+ typename GenericType::cpptype *value = argument_cpp_vector[next_tuple];
+ if (value != nullptr) {
+ range_sum += *value;
+ range_count++;
+ }
+
+ next_tuple++;
+ }
+
+ // Calculate the result cpp value.
+ typename OutputType::cpptype *result_cpp_value = nullptr;
+ if (range_count != 0) {
+ result_cpp_value = new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(range_sum) / range_count;
+ }
+
+ // Add the result values to the tuples with in the same order key value.
+ while (current_tuple != next_tuple) {
+ range_result_cpp_vector.push_back(result_cpp_value);
+ current_tuple++;
}
}
- CheckAvgValues(result_cpp_vector, result);
+ CheckAvgValues(range_result_cpp_vector, range_result);
}
template <typename GenericType, typename OutputType>
@@ -229,20 +300,19 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
std::vector<ColumnVector*> arguments;
arguments.push_back(argument_type_vector);
- // The partition key index is 0.
- std::vector<attribute_id> partition_key(1, 0);
- ColumnVector *result =
- handle_avg_->calculate(tuple_accessor,
- std::move(arguments),
- partition_key,
- true /* is_row */,
- kNumPreceding,
- kNumFollowing);
+ // Check ROWS mode.
+ WindowAggregationHandle *rows_handle =
+ initializeHandle(GenericType::Instance(true),
+ true /* is_row */,
+ kNumPreceding,
+ kNumFollowing);
+ ColumnVector *rows_result =
+ rows_handle->calculate(tuple_accessor, arguments);
// Get the cpptype result.
// For each value, calculate all surrounding values in the window.
- std::vector<typename OutputType::cpptype*> result_cpp_vector;
+ std::vector<typename OutputType::cpptype*> rows_result_cpp_vector;
for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
typename GenericType::cpptype sum;
@@ -281,19 +351,81 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
}
if (count == 0) {
- result_cpp_vector.push_back(nullptr);
+ rows_result_cpp_vector.push_back(nullptr);
} else {
typename OutputType::cpptype *result_cpp_value =
new typename OutputType::cpptype;
*result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
- result_cpp_vector.push_back(result_cpp_value);
+ rows_result_cpp_vector.push_back(result_cpp_value);
}
}
- CheckAvgValues(result_cpp_vector, result);
- }
+ CheckAvgValues(rows_result_cpp_vector, rows_result);
+
+ // Check RANGE mode.
+ WindowAggregationHandle *range_handle =
+ initializeHandle(GenericType::Instance(true),
+ false /* is_row */,
+ kNumPreceding,
+ kNumFollowing);
+ ColumnVector *range_result =
+ range_handle->calculate(tuple_accessor, arguments);
+
+ // Get the cpptype result.
+ // For each value, calculate all surrounding values in the window.
+ std::vector<typename OutputType::cpptype*> range_result_cpp_vector;
+
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ int count = 0;
+
+ if (argument_cpp_vector[i] != nullptr) {
+ sum += *argument_cpp_vector[i];
+ count++;
+ }
+
+ int preceding_bound = i / kNumTuplesPerOrderKey - kNumPreceding;
+ for (std::size_t precede = 1; precede <= kNumTuples; ++precede) {
+ // Not in range or the same partition.
+ if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+ static_cast<int>((i - precede) / kNumTuplesPerOrderKey) < preceding_bound) {
+ break;
+ }
+
+ if (argument_cpp_vector[i - precede] != nullptr) {
+ sum += *argument_cpp_vector[i - precede];
+ count++;
+ }
+ }
+
+ int following_bound = i / kNumTuplesPerOrderKey + kNumFollowing;
+ for (int follow = 1; follow <= kNumTuples; ++follow) {
+ // Not in range or the same partition.
+ if (i + follow >= kNumTuples ||
+ i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+ static_cast<int>((i + follow) / kNumTuplesPerOrderKey) > following_bound) {
+ break;
+ }
+
+ if (argument_cpp_vector[i + follow] != nullptr) {
+ sum += *argument_cpp_vector[i + follow];
+ count++;
+ }
+ }
+
+ if (count == 0) {
+ rows_result_cpp_vector.push_back(nullptr);
+ } else {
+ typename OutputType::cpptype *result_cpp_value =
+ new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+ range_result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
- std::unique_ptr<WindowAggregationHandle> handle_avg_;
+ CheckAvgValues(range_result_cpp_vector, range_result);
+ }
};
template <>
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..88103df 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1663,7 +1663,7 @@ void ExecutionGenerator::convertWindowAggregate(
std::static_pointer_cast<const E::WindowAggregateFunction>(
named_window_aggregate_expression->expression());
- // Set the AggregateFunction.
+ // Set the WindowAggregateFunction.
window_aggr_state_proto->mutable_function()->MergeFrom(
window_aggregate_function->window_aggregate().getProto());
@@ -1683,6 +1683,15 @@ void ExecutionGenerator::convertWindowAggregate(
->MergeFrom(concretized_partition_by_attribute->getProto());
}
+ // Set order keys.
+ for (const E::ScalarPtr &order_by_attribute
+ : window_info.order_by_attributes) {
+ unique_ptr<const Scalar> concretized_order_by_attribute(
+ order_by_attribute->concretize(attribute_substitution_map_));
+ window_aggr_state_proto->add_order_by_attributes()
+ ->MergeFrom(concretized_order_by_attribute->getProto());
+ }
+
// Set window frame info.
if (window_info.frame_info == nullptr) {
// If the frame is not specified, use the default setting:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index c224388..46808bf 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -814,9 +814,9 @@ L::LogicalPtr Resolver::resolveInsertSelection(
cast_expressions.emplace_back(selection_attributes[aid]);
} else {
// TODO(jianqiao): implement Cast operation for non-numeric types.
- if (destination_type.getSuperTypeID() == Type::kNumeric
- && selection_type.getSuperTypeID() == Type::kNumeric
- && destination_type.isSafelyCoercibleFrom(selection_type)) {
+ if (destination_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+ selection_type.getSuperTypeID() == Type::SuperTypeID::kNumeric &&
+ destination_type.isSafelyCoercibleFrom(selection_type)) {
// Add cast operation
const E::AttributeReferencePtr attr = selection_attributes[aid];
const E::ExpressionPtr cast_expr =
@@ -1691,6 +1691,19 @@ E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
// Resolve window frame
if (parse_window.frame_info() != nullptr) {
const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info();
+ // For FRAME mode, the first attribute in ORDER BY must be numeric.
+ // TODO(Shixuan): Time-related types should also be supported. To handle
+ // this, some changes in the parser needs to be done since the time range
+ // should be specified with time units. Also, UNBOUNDED flags might be
+ // needed because -1 might not make sense in this case.
+ if (!parse_frame_info->is_row &&
+ (order_by_attributes.empty() ||
+ order_by_attributes[0]->getValueType().getSuperTypeID() != Type::SuperTypeID::kNumeric)) {
+ THROW_SQL_ERROR_AT(&parse_window)
+ << "A numeric attribute should be specified as the first ORDER BY "
+ << "attribute in FRAME mode";
+ }
+
frame_info = new E::WindowFrameInfo(parse_frame_info->is_row,
parse_frame_info->num_preceding,
parse_frame_info->num_following);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 30a3c39..6bada6c 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -1025,17 +1025,40 @@ WINDOW w AS
+--------------------+-----------+------------------------+
==
-# Currently this is not supported, an empty table will be returned.
-SELECT int_col, sum(float_col) OVER
-(PARTITION BY char_col, long_col
- ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
- RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+SELECT float_col, double_col, avg(double_col) OVER
+(ORDER BY float_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 2 PRECEDING AND 2 FOLLOWING)
FROM test;
--
-+-----------+------------------------+
-|int_col |sum(float_col) |
-+-----------+------------------------+
-+-----------+------------------------+
++---------------+------------------------+------------------------+
+|float_col |double_col |avg(double_col) |
++---------------+------------------------+------------------------+
+| 4.89897966| 117.57550765359254| -5.2010907233390986|
+| 4.79583168| -110.30412503619254| -3.3458568752518572|
+| 4.69041586| 103.18914671611546| -3.3458568752518572|
+| 4.5825758| -96.234089594072643| -4.2942570191393745|
+| 4.47213602| NULL| -4.2942570191393745|
+| 4.35889912| -82.81907992727281| -3.1771278735018194|
+| 4.2426405| 76.367532368147124| -3.1771278735018194|
+| 4.12310553| -70.092795635500224| -3.6217507631683268|
+| 4| 64| -3.0100796703699935|
+| 3.87298346| -58.094750193111253| -3.0100796703699935|
+| 3.7416575| 52.38320341483518| -3.0100796703699935|
+| 3.60555124| -46.872166581031856| -3.1193833079868254|
+| 3.46410155| 41.569219381653056| -3.1193833079868254|
+| 3.31662488| -36.4828726939094| -2.8361542397614437|
+| 3.1622777| NULL| -2.8361542397614437|
+| 3| -27| -2.7526926834086507|
+| 2.82842708| 22.627416997969522| -8.4826069851706123|
+| 2.64575124| -18.520259177452136| -9.0010404404476727|
+| 2.44948983| 14.696938456699067| -4.1547599319129516|
+| 2.23606801| -11.180339887498949| -4.2708832009567148|
+| 2| 8| 0.11724429467951912|
+| 1.73205078| -5.196152422706632| -4.7108157334609286|
+| 1.41421354| 2.8284271247461903| -5.1226841602152344|
+| 1| -1| -1.638218767582549|
+| 0| NULL| 1.1580686755098886|
++---------------+------------------------+------------------------+
==
SELECT sum(avg(int_col) OVER w) FROM test
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 0cdfc1a..49fa44d 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -56,15 +56,13 @@ WindowAggregationOperationState::WindowAggregationOperationState(
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
arguments_(std::move(arguments)),
- is_row_(is_row),
- num_preceding_(num_preceding),
- num_following_(num_following),
storage_manager_(storage_manager) {
// Get the Types of this window aggregate's arguments so that we can create an
// AggregationHandle.
@@ -76,18 +74,14 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Check if window aggregate function could apply to the arguments.
DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
- // IDs and types of partition keys.
- std::vector<const Type*> partition_by_types;
- for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
- partition_by_ids_.push_back(
- partition_by_attribute->getAttributeIdForValueAccessor());
- partition_by_types.push_back(&partition_by_attribute->getType());
- }
-
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(std::move(argument_types),
- std::move(partition_by_types)));
+ window_aggregate_function->createHandle(argument_types,
+ partition_by_attributes,
+ order_by_attributes,
+ is_row,
+ num_preceding,
+ num_following));
}
WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -113,6 +107,15 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
database));
}
+ std::vector<std::unique_ptr<const Scalar>> order_by_attributes;
+ for (int attribute_idx = 0;
+ attribute_idx < proto.order_by_attributes_size();
+ ++attribute_idx) {
+ order_by_attributes.emplace_back(ScalarFactory::ReconstructFromProto(
+ proto.order_by_attributes(attribute_idx),
+ database));
+ }
+
const bool is_row = proto.is_row();
const std::int64_t num_preceding = proto.num_preceding();
const std::int64_t num_following = proto.num_following();
@@ -121,6 +124,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
&WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
std::move(arguments),
partition_by_attributes,
+ order_by_attributes,
is_row,
num_preceding,
num_following,
@@ -160,6 +164,15 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
}
}
+ for (int attribute_idx = 0;
+ attribute_idx < proto.order_by_attributes_size();
+ ++attribute_idx) {
+ if (!ScalarFactory::ProtoIsValid(proto.order_by_attributes(attribute_idx),
+ database)) {
+ return false;
+ }
+ }
+
if (proto.num_preceding() < -1 || proto.num_following() < -1) {
return false;
}
@@ -177,14 +190,6 @@ void WindowAggregationOperationState::windowAggregateBlocks(
return;
}
- // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
- // work. This will need Order Key to be passed so that we know where the
- // window should start and end.
- if (!is_row_) {
- std::cout << "Currently we don't support RANGE frame mode :(\n";
- return;
- }
-
// Get the total number of tuples.
int num_tuples = 0;
for (const block_id block_idx : block_ids) {
@@ -226,7 +231,11 @@ void WindowAggregationOperationState::windowAggregateBlocks(
block->getIndices(),
block->getIndicesConsistent());
ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
- ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+ ColumnVectorsValueAccessor *argument_accessor = nullptr;
+ if (!arguments_.empty()) {
+ argument_accessor = new ColumnVectorsValueAccessor();
+ }
+
for (const std::unique_ptr<const Scalar> &argument : arguments_) {
argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
&sub_block_ref));
@@ -235,9 +244,15 @@ void WindowAggregationOperationState::windowAggregateBlocks(
InvokeOnAnyValueAccessor(tuple_accessor,
[&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11)
tuple_accessor->beginIteration();
- argument_accessor->beginIteration();
+ if (argument_accessor != nullptr) {
+ argument_accessor->beginIteration();
+ }
+
+ while (tuple_accessor->next()) {
+ if (argument_accessor != nullptr) {
+ argument_accessor->next();
+ }
- while (tuple_accessor->next() && argument_accessor->next()) {
for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
ColumnVector *attr_vec = attribute_vecs[attr_id];
if (attr_vec->isNative()) {
@@ -275,11 +290,7 @@ void WindowAggregationOperationState::windowAggregateBlocks(
// Do actual calculation in handle.
ColumnVector *window_aggregates =
window_aggregation_handle_->calculate(all_blocks_accessor,
- std::move(argument_vecs),
- partition_by_ids_,
- is_row_,
- num_preceding_,
- num_following_);
+ argument_vecs);
all_blocks_accessor->addColumn(window_aggregates);
output_destination->bulkInsertTuples(all_blocks_accessor);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index 9792a99..726b102 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -57,6 +57,7 @@ class WindowAggregationOperationState {
* computed.
* @param arguments A list of argument expressions to that aggregate.
* @param partition_by_attributes A list of window partition key.
+ * @param order_by_attributes A list of window order key.
* @param is_row True if the window frame is calculated by ROW, false if it is
* calculated by RANGE.
* @param num_preceding The number of rows/range for the tuples preceding the
@@ -69,6 +70,7 @@ class WindowAggregationOperationState {
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
@@ -120,13 +122,6 @@ class WindowAggregationOperationState {
const std::vector<block_id> block_ids_;
std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
std::vector<std::unique_ptr<const Scalar>> arguments_;
- std::vector<attribute_id> partition_by_ids_;
-
- // Frame info.
- const bool is_row_;
- const std::int64_t num_preceding_;
- const std::int64_t num_following_;
-
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0172fde/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index d888461..f879713 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -30,4 +30,5 @@ message WindowAggregationOperationState {
required bool is_row = 5;
required int64 num_preceding = 6; // -1 means UNBOUNDED PRECEDING.
required int64 num_following = 7; // -1 means UNBOUNDED FOLLOWING.
+ repeated Scalar order_by_attributes = 8;
}