You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ra...@apache.org on 2016/08/05 22:52:38 UTC
[14/30] incubator-quickstep git commit: - Supported ROWS mode for AVG
window aggregation. - Created WindowAggregateFunctions in
expressions/window_aggregation. - Created WindowAggregationHandle for AVG to
actually do the calculation. - Other functions wi
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..c044a98
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+ constexpr int kNumTuples = 100;
+ constexpr int kNumTuplesPerPartition = 8;
+ constexpr int kNullInterval = 25;
+ constexpr int kNumPreceding = 2;
+ constexpr int kNumFollowing = 2;
+
+} // namespace
+
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+ // Handle initialization.
+ void initializeHandle(const Type &argument_type) {
+ const WindowAggregateFunction &function =
+ WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg);
+ std::vector<const Type*> partition_key_types(1, &TypeFactory::GetType(kInt, false));
+ handle_avg_.reset(function.createHandle(std::vector<const Type*>(1, &argument_type),
+ std::move(partition_key_types)));
+ }
+
+ // Test canApplyToTypes().
+ static bool CanApplyToTypesTest(TypeID typeID) {
+ const Type &type = (typeID == kChar || typeID == kVarChar) ?
+ TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+ TypeFactory::GetType(typeID);
+
+ return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+ std::vector<const Type*>(1, &type));
+ }
+
+ // Test resultTypeForArgumentTypes().
+ static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+ TypeID output_type_id) {
+ const Type *result_type
+ = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+ std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+ return (result_type->getTypeID() == output_type_id);
+ }
+
+ template <typename CppType>
+ static void CheckAvgValues(
+ const std::vector<CppType*> &expected,
+ const ColumnVector *actual) {
+ EXPECT_TRUE(actual->isNative());
+ const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+ EXPECT_EQ(expected.size(), native->size());
+ for (std::size_t i = 0; i < expected.size(); ++i) {
+ if (expected[i] == nullptr) {
+ EXPECT_TRUE(native->getTypedValue(i).isNull());
+ } else {
+ EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<CppType>());
+ }
+ }
+ }
+
+ // Static templated methods for set a meaningful value to data types.
+ template <typename CppType>
+ static void SetDataType(int value, CppType *data) {
+ *data = value;
+ }
+
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkWindowAggregationAvgGeneric() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+
+ // Create argument, partition key and cpptype vectors.
+ std::vector<typename GenericType::cpptype*> argument_cpp_vector;
+ argument_cpp_vector.reserve(kNumTuples);
+ ColumnVector *argument_type_vector =
+ createArgumentGeneric<GenericType>(&argument_cpp_vector);
+ NativeColumnVector *partition_key_vector =
+ new NativeColumnVector(IntType::InstanceNonNullable(), kNumTuples + 2);
+
+ for (int i = 0; i < kNumTuples; ++i) {
+ partition_key_vector->appendTypedValue(TypedValue(i / kNumTuplesPerPartition));
+ }
+
+ // Create tuple ValueAccessor.
+ ColumnVectorsValueAccessor *tuple_accessor = new ColumnVectorsValueAccessor();
+ tuple_accessor->addColumn(partition_key_vector);
+ tuple_accessor->addColumn(argument_type_vector);
+
+ // Test UNBOUNDED PRECEDING AND CURRENT ROW.
+ checkAccumulate<GenericType, OutputType>(tuple_accessor,
+ argument_type_vector,
+ argument_cpp_vector);
+ // Test kNumPreceding PRECEDING AND kNumFollowing FOLLOWING.
+ checkSlidingWindow<GenericType, OutputType>(tuple_accessor,
+ argument_type_vector,
+ argument_cpp_vector);
+ }
+
+ template <typename GenericType>
+ ColumnVector *createArgumentGeneric(
+ std::vector<typename GenericType::cpptype*> *argument_cpp_vector) {
+ const GenericType &type = GenericType::Instance(true);
+ NativeColumnVector *column = new NativeColumnVector(type, kNumTuples);
+
+ for (int i = 0; i < kNumTuples; ++i) {
+ // Insert a NULL every kNullInterval tuples.
+ if (i % kNullInterval == 0) {
+ argument_cpp_vector->push_back(nullptr);
+ column->appendTypedValue(type.makeNullValue());
+ continue;
+ }
+
+ typename GenericType::cpptype *val = new typename GenericType::cpptype;
+
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, val);
+ } else {
+ SetDataType(static_cast<float>(i - 10) / 10, val);
+ }
+
+ column->appendTypedValue(type.makeValue(val));
+ argument_cpp_vector->push_back(val);
+ }
+
+ return column;
+ }
+
+ template <typename GenericType, typename OutputType>
+ void checkAccumulate(ColumnVectorsValueAccessor *tuple_accessor,
+ ColumnVector *argument_type_vector,
+ const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+ std::vector<ColumnVector*> arguments;
+ arguments.push_back(argument_type_vector);
+ // The partition key index is 0.
+ std::vector<attribute_id> partition_key(1, 0);
+
+ ColumnVector *result =
+ handle_avg_->calculate(tuple_accessor,
+ std::move(arguments),
+ partition_key,
+ true /* is_row */,
+ -1 /* num_preceding: UNBOUNDED PRECEDING */,
+ 0 /* num_following: CURRENT ROW */);
+
+ // Get the cpptype result.
+ std::vector<typename OutputType::cpptype*> result_cpp_vector;
+ typename GenericType::cpptype sum;
+ int count;
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ // Start of new partition
+ if (i % kNumTuplesPerPartition == 0) {
+ SetDataType(0, &sum);
+ count = 0;
+ }
+
+ typename GenericType::cpptype *value = argument_cpp_vector[i];
+ if (value != nullptr) {
+ sum += *value;
+ count++;
+ }
+
+ if (count == 0) {
+ result_cpp_vector.push_back(nullptr);
+ } else {
+ typename OutputType::cpptype *result_cpp_value =
+ new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+ result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
+
+ CheckAvgValues(result_cpp_vector, result);
+ }
+
+ template <typename GenericType, typename OutputType>
+ void checkSlidingWindow(ColumnVectorsValueAccessor *tuple_accessor,
+ ColumnVector *argument_type_vector,
+ const std::vector<typename GenericType::cpptype*> &argument_cpp_vector) {
+ std::vector<ColumnVector*> arguments;
+ arguments.push_back(argument_type_vector);
+ // The partition key index is 0.
+ std::vector<attribute_id> partition_key(1, 0);
+
+ ColumnVector *result =
+ handle_avg_->calculate(tuple_accessor,
+ std::move(arguments),
+ partition_key,
+ true /* is_row */,
+ kNumPreceding,
+ kNumFollowing);
+
+ // Get the cpptype result.
+ // For each value, calculate all surrounding values in the window.
+ std::vector<typename OutputType::cpptype*> result_cpp_vector;
+
+ for (std::size_t i = 0; i < argument_cpp_vector.size(); ++i) {
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ int count = 0;
+
+ if (argument_cpp_vector[i] != nullptr) {
+ sum += *argument_cpp_vector[i];
+ count++;
+ }
+
+ for (std::size_t precede = 1; precede <= kNumPreceding; ++precede) {
+ // Not the same partition.
+ if (i / kNumTuplesPerPartition != (i - precede) / kNumTuplesPerPartition ||
+ i < precede) {
+ break;
+ }
+
+ if (argument_cpp_vector[i - precede] != nullptr) {
+ sum += *argument_cpp_vector[i - precede];
+ count++;
+ }
+ }
+
+ for (int follow = 1; follow <= kNumPreceding; ++follow) {
+ // Not the same partition.
+ if (i / kNumTuplesPerPartition != (i + follow) / kNumTuplesPerPartition ||
+ i + follow >= kNumTuples) {
+ break;
+ }
+
+ if (argument_cpp_vector[i + follow] != nullptr) {
+ sum += *argument_cpp_vector[i + follow];
+ count++;
+ }
+ }
+
+ if (count == 0) {
+ result_cpp_vector.push_back(nullptr);
+ } else {
+ typename OutputType::cpptype *result_cpp_value =
+ new typename OutputType::cpptype;
+ *result_cpp_value = static_cast<typename OutputType::cpptype>(sum) / count;
+ result_cpp_vector.push_back(result_cpp_value);
+ }
+ }
+
+ CheckAvgValues(result_cpp_vector, result);
+ }
+
+ std::unique_ptr<WindowAggregationHandle> handle_avg_;
+};
+
+template <>
+void WindowAggregationHandleAvgTest::CheckAvgValues<double>(
+ const std::vector<double*> &expected,
+ const ColumnVector *actual) {
+ EXPECT_TRUE(actual->isNative());
+ const NativeColumnVector *native = static_cast<const NativeColumnVector*>(actual);
+
+ EXPECT_EQ(expected.size(), native->size());
+ for (std::size_t i = 0; i < expected.size(); ++i) {
+ if (expected[i] == nullptr) {
+ EXPECT_TRUE(native->getTypedValue(i).isNull());
+ } else {
+ EXPECT_EQ(*expected[i], native->getTypedValue(i).getLiteral<double>());
+ }
+ }
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(
+ int value, DatetimeIntervalLit *data) {
+ data->interval_ticks = value;
+}
+
+template <>
+void WindowAggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(
+ int value, YearMonthIntervalLit *data) {
+ data->months = value;
+}
+
+typedef WindowAggregationHandleAvgTest WindowAggregationHandleAvgDeathTest;
+
+TEST_F(WindowAggregationHandleAvgTest, IntTypeTest) {
+ checkWindowAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, LongTypeTest) {
+ checkWindowAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, FloatTypeTest) {
+ checkWindowAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DoubleTypeTest) {
+ checkWindowAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+ checkWindowAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(WindowAggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+ checkWindowAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(WindowAggregationHandleAvgDeathTest, CharTypeTest) {
+ const Type &type = CharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, VarTypeTest) {
+ const Type &type = VarCharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+#endif
+
+TEST_F(WindowAggregationHandleAvgDeathTest, canApplyToTypeTest) {
+ EXPECT_TRUE(CanApplyToTypesTest(kInt));
+ EXPECT_TRUE(CanApplyToTypesTest(kLong));
+ EXPECT_TRUE(CanApplyToTypesTest(kFloat));
+ EXPECT_TRUE(CanApplyToTypesTest(kDouble));
+ EXPECT_FALSE(CanApplyToTypesTest(kChar));
+ EXPECT_FALSE(CanApplyToTypesTest(kVarChar));
+ EXPECT_FALSE(CanApplyToTypesTest(kDatetime));
+ EXPECT_TRUE(CanApplyToTypesTest(kDatetimeInterval));
+ EXPECT_TRUE(CanApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(WindowAggregationHandleAvgDeathTest, ResultTypeForArgumentTypeTest) {
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kInt, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kLong, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kFloat, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDouble, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kDatetimeInterval, kDatetimeInterval));
+ EXPECT_TRUE(ResultTypeForArgumentTypesTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_ExecutionHeuristics
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
// Get input.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
- window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new WindowAggregationOperator(query_handle_->query_id(),
+ *input_relation_info->relation,
*output_relation,
window_aggr_state_index,
insert_destination_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
glog
- quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_Expression
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
#include <utility>
#include <vector>
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/Expression.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
}
WindowAggregateFunctionPtr WindowAggregateFunction::Create(
- const ::quickstep::AggregateFunction &window_aggregate,
+ const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..0cc80df 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,8 +33,8 @@
namespace quickstep {
-class AggregateFunction;
class Type;
+class WindowAggregateFunction;
namespace optimizer {
namespace expressions {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
* @return The WindowAggregateFunction singleton (from the expression system)
* for this node.
**/
- inline const ::quickstep::AggregateFunction& window_aggregate() const {
+ inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
return window_aggregate_;
}
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
* @param is_distinct Whether this is a DISTINCT aggregation.
* @return A new AggregateFunctionPtr.
**/
- static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+ static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
* @param window_info The window info of the window aggregate function.
* @param is_distinct Indicates whether this is a DISTINCT aggregation.
*/
- WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+ WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
// window_aggregate_. If it really needs to be seperated from the
// AggregationFunction, a new class for WindowAggregationFunction should be
// created as quickstep::WindowAggregateFunction.
- const ::quickstep::AggregateFunction &window_aggregate_;
+ const ::quickstep::WindowAggregateFunction &window_aggregate_;
std::vector<ScalarPtr> arguments_;
const WindowInfo window_info_;
const std::string window_name_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index fb75767..9313e51 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_expressions_tablegenerator_GeneratorFunction
quickstep_expressions_tablegenerator_GeneratorFunctionFactory
quickstep_expressions_tablegenerator_GeneratorFunctionHandle
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
quickstep_parser_ParseAssignment
quickstep_parser_ParseBasicExpressions
quickstep_parser_ParseBlockProperties
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index f10378b..c224388 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -35,6 +35,8 @@
#include "expressions/table_generator/GeneratorFunction.hpp"
#include "expressions/table_generator/GeneratorFunctionFactory.hpp"
#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
#include "parser/ParseAssignment.hpp"
#include "parser/ParseBasicExpressions.hpp"
#include "parser/ParseBlockProperties.hpp"
@@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall(
<< "COUNT aggregate has both star (*) and non-star arguments.";
}
- // Try to look up the AggregateFunction by name using
- // AggregateFunctionFactory.
- const ::quickstep::AggregateFunction *aggregate
- = AggregateFunctionFactory::GetByName(function_name);
- if (aggregate == nullptr) {
+ // Try to look up the AggregateFunction/WindowAggregationFunction by name.
+ // TODO(Shixuan): We might want to create a new abstract class Function to
+ // include both AggregateFunction and WindowAggregateFunction, which will make
+ // this part of code cleaner.
+ const ::quickstep::AggregateFunction *aggregate = nullptr;
+ const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
+ if (parse_function_call.isWindow()) {
+ window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
+ } else {
+ aggregate = AggregateFunctionFactory::GetByName(function_name);
+ }
+
+ if (aggregate == nullptr && window_aggregate == nullptr) {
THROW_SQL_ERROR_AT(&parse_function_call)
<< "Unrecognized function name \""
<< parse_function_call.name()->value()
@@ -2656,11 +2666,14 @@ E::ScalarPtr Resolver::resolveFunctionCall(
}
// Make sure a naked COUNT() with no arguments wasn't specified.
- if ((aggregate->getAggregationID() == AggregationID::kCount)
- && (resolved_arguments.empty())
- && (!count_star)) {
- THROW_SQL_ERROR_AT(&parse_function_call)
- << "COUNT aggregate requires an argument (either scalar or star (*))";
+ if ((aggregate != nullptr &&
+ aggregate->getAggregationID() == AggregationID::kCount) ||
+ (window_aggregate != nullptr &&
+ window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
+ if ((resolved_arguments.empty()) && !count_star) {
+ THROW_SQL_ERROR_AT(&parse_function_call)
+ << "COUNT aggregate requires an argument (either scalar or star (*))";
+ }
}
// Resolve each of the Scalar arguments to the aggregate.
@@ -2670,7 +2683,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
}
// Make sure that the aggregate can apply to the specified argument(s).
- if (!aggregate->canApplyToTypes(argument_types)) {
+ if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
+ || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
THROW_SQL_ERROR_AT(&parse_function_call)
<< "Aggregate function " << aggregate->getName()
<< " can not apply to the given argument(s).";
@@ -2679,7 +2693,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
if (parse_function_call.isWindow()) {
return resolveWindowAggregateFunction(parse_function_call,
expression_resolution_info,
- aggregate,
+ window_aggregate,
resolved_arguments);
}
@@ -2705,7 +2719,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
E::ScalarPtr Resolver::resolveWindowAggregateFunction(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info,
- const ::quickstep::AggregateFunction *window_aggregate,
+ const ::quickstep::WindowAggregateFunction *window_aggregate,
const std::vector<E::ScalarPtr> &resolved_arguments) {
// A window aggregate function might be defined OVER a window name or a window.
E::WindowAggregateFunctionPtr window_aggregate_function;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index f4024e9..373430c 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -23,7 +23,6 @@
#include <unordered_set>
#include <vector>
-#include "query_optimizer/expressions/AggregateFunction.hpp"
#include "query_optimizer/expressions/Alias.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
@@ -460,14 +459,14 @@ class Resolver {
* @param expression_resolution_info Resolution info that contains the name
* resolver and info to be updated after
* resolution.
- * @param aggregate The aggregate function.
+ * @param aggregate The window aggregate function.
* @param resolved_arguments The resolved arguments.
* @return An expression in the query optimizer.
*/
expressions::ScalarPtr resolveWindowAggregateFunction(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info,
- const ::quickstep::AggregateFunction *aggregate,
+ const ::quickstep::WindowAggregateFunction *aggregate,
const std::vector<expressions::ScalarPtr> &resolved_arguments);
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/query_optimizer/tests/execution_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Select.test b/query_optimizer/tests/execution_generator/Select.test
index 16127cc..30a3c39 100644
--- a/query_optimizer/tests/execution_generator/Select.test
+++ b/query_optimizer/tests/execution_generator/Select.test
@@ -953,19 +953,79 @@ WHERE double_col < 0
==
# Window Aggregation Test.
-# Currently this is not supported, an empty table will be returned.
-SELECT avg(int_col) OVER w FROM test
+SELECT char_col, long_col, avg(long_col) OVER w FROM test
WINDOW w AS
-(PARTITION BY char_col
- ORDER BY long_col DESC NULLS LAST
+(ORDER BY char_col DESC NULLS LAST
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
--
-+------------------------+
-|avg(int_col) |
-+------------------------+
-+------------------------+
++--------------------+--------------------+------------------------+
+|char_col |long_col |avg(long_col) |
++--------------------+--------------------+------------------------+
+| 8 2.828427| 64| 64|
+| 6 2.449490| 36| 50|
+| 4 2.000000| 16| 38.666666666666664|
+| 24 4.898979| 576| 173|
+| 22 4.690416| 484| 235.19999999999999|
+| 20 4.472136| 400| 262.66666666666669|
+| 2 1.414214| 4| 225.71428571428572|
+| 18 4.242641| 324| 238|
+| 16 4.000000| 256| 240|
+| 14 3.741657| 196| 235.59999999999999|
+| 12 3.464102| 144| 227.27272727272728|
+| 10 3.162278| 100| 216.66666666666666|
+| 0 0.000000| 0| 200|
+| -9 3.000000| 81| 191.5|
+| -7 2.645751| 49| 182|
+| -5 2.236068| 25| 172.1875|
+| -3 1.732051| 9| 162.58823529411765|
+| -23 4.795832| 529| 182.94444444444446|
+| -21 4.582576| 441| 196.52631578947367|
+| -19 4.358899| 361| 204.75|
+| -17 4.123106| 289| 208.76190476190476|
+| -15 3.872983| 225| 209.5|
+| -13 3.605551| 169| 207.7391304347826|
+| -11 3.316625| 121| 204.125|
+| -1 1.000000| 1| 196|
++--------------------+--------------------+------------------------+
==
+SELECT long_col, int_col, avg(int_col) OVER w FROM test
+WINDOW w AS
+(ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
+--
++--------------------+-----------+------------------------+
+|long_col |int_col |avg(int_col) |
++--------------------+-----------+------------------------+
+| 576| 24| 7.666666666666667|
+| 529| -23| 0.5|
+| 484| 22| 0.5|
+| 441| -21| -10.25|
+| 400| NULL| 0|
+| 361| -19| -9.75|
+| 324| 18| -0.5|
+| 289| -17| -3.3999999999999999|
+| 256| 16| 3.2000000000000002|
+| 225| -15| -3|
+| 196| 14| 2.7999999999999998|
+| 169| -13| -2.6000000000000001|
+| 144| 12| 0.5|
+| 121| -11| -5.25|
+| 100| NULL| 0|
+| 81| -9| -4.75|
+| 64| 8| -0.5|
+| 49| -7| -1.3999999999999999|
+| 36| 6| 1.2|
+| 25| -5| -1|
+| 16| 4| 0.80000000000000004|
+| 9| -3| -0.59999999999999998|
+| 4| 2| 0.5|
+| 1| -1| -0.66666666666666663|
+| 0| NULL| 0.5|
++--------------------+-----------+------------------------+
+==
+
+# Currently this is not supported, an empty table will be returned.
SELECT int_col, sum(float_col) OVER
(PARTITION BY char_col, long_col
ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
@@ -987,5 +1047,5 @@ WINDOW w AS
+------------------------+
|sum(avg(int_col)) |
+------------------------+
-| NULL|
+| -18|
+------------------------+
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 249441d..a51370b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
+ quickstep_storage_WindowAggregationOperationState
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 93cb9d4..3149864 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -21,11 +21,13 @@
#include <vector>
+#include "catalog/CatalogRelation.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
#include "tmb/id_typedefs.h"
@@ -40,10 +42,14 @@ bool WindowAggregationOperator::getAllWorkOrders(
DCHECK(query_context != nullptr);
if (blocking_dependencies_met_ && !generated_) {
+ std::vector<block_id> relation_blocks =
+ input_relation_.getBlocksSnapshot();
+
container->addNormalWorkOrder(
new WindowAggregationWorkOrder(
query_id_,
query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+ std::move(relation_blocks),
query_context->getInsertDestination(output_destination_index_)),
op_index_);
generated_ = true;
@@ -67,6 +73,11 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
proto->set_query_id(query_id_);
proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
window_aggregation_state_index_);
+
+ const std::vector<block_id> relation_blocks = input_relation_.getBlocksSnapshot();
+ for (const block_id bid : relation_blocks) {
+ proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
+ }
proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
output_destination_index_);
@@ -75,8 +86,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
void WindowAggregationWorkOrder::execute() {
- std::cout << "Window aggregation is not supported yet.\n"
- << "An empty table is returned\n";
+ state_->windowAggregateBlocks(output_destination_,
+ block_ids_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..bd83248 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator {
*
* @param query_id The ID of this query.
* @param input_relation The relation to perform aggregation over.
+ * @param output_relation The relation for output.
* @param window_aggregation_state_index The index of WindowAggregationState
* in QueryContext.
* @param output_destination_index The index of InsertDestination in
* QueryContext for the output.
**/
WindowAggregationOperator(const std::size_t query_id,
+ const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
const QueryContext::window_aggregation_state_id window_aggregation_state_index,
const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
+ input_relation_(input_relation),
output_relation_(output_relation),
window_aggregation_state_index_(window_aggregation_state_index),
output_destination_index_(output_destination_index),
@@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
**/
serialization::WorkOrder* createWorkOrderProto();
+ const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
const QueryContext::insert_destination_id output_destination_index_;
@@ -117,43 +121,25 @@ class WindowAggregationWorkOrder : public WorkOrder {
*
* @param query_id The ID of this query.
* @param state The WindowAggregationOperatorState to use.
+ * @param block_ids The blocks' id of the input relation.
* @param output_destination The InsertDestination for output.
**/
WindowAggregationWorkOrder(const std::size_t query_id,
WindowAggregationOperationState *state,
+ std::vector<block_id> &&block_ids,
InsertDestination *output_destination)
: WorkOrder(query_id),
state_(state),
+ block_ids_(std::move(block_ids)),
output_destination_(output_destination) {}
~WindowAggregationWorkOrder() override {}
- /**
- * @brief Get the pointer to WindowAggregationOperationState.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the window aggregation operation state.
- **/
- WindowAggregationOperationState* state() {
- return state_;
- }
-
- /**
- * @brief Get the pointer to output destination.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the output destination.
- **/
- InsertDestination* output_destination() {
- return output_destination_;
- }
-
void execute() override;
private:
WindowAggregationOperationState *state_;
+ const std::vector<block_id> block_ids_;
InsertDestination *output_destination_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 69dee1b..076735f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -249,6 +249,7 @@ message WindowAggregationWorkOrder {
extend WorkOrder {
// All required
optional uint32 window_aggr_state_index = 336;
- optional int32 insert_destination_index = 337;
+ repeated fixed64 block_ids = 337;
+ optional int32 insert_destination_index = 338;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..582effd 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -1053,19 +1053,26 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
quickstep_catalog_CatalogTypedefs
quickstep_expressions_ExpressionFactories
quickstep_expressions_Expressions_proto
- quickstep_expressions_aggregation_AggregateFunction
- quickstep_expressions_aggregation_AggregateFunctionFactory
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_expressions_aggregation_AggregationID
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_storage_InsertDestination
quickstep_storage_StorageBlockInfo
quickstep_storage_StorageManager
+ quickstep_storage_SubBlocksReference
+ quickstep_storage_ValueAccessor
+ quickstep_storage_ValueAccessorUtil
quickstep_storage_WindowAggregationOperationState_proto
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorUtil
+ quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
- quickstep_expressions_aggregation_AggregateFunction_proto
quickstep_expressions_Expressions_proto
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
${PROTOBUF_LIBRARY})
# Module all-in-one library:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index a0bcc37..0cdfc1a 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -31,14 +31,21 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/ExpressionFactories.hpp"
#include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunctionFactory.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/InsertDestination.hpp"
#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/containers/ColumnVectorUtil.hpp"
#include "glog/logging.h"
@@ -46,23 +53,21 @@ namespace quickstep {
WindowAggregationOperationState::WindowAggregationOperationState(
const CatalogRelationSchema &input_relation,
- const AggregateFunction *window_aggregate_function,
+ const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
- std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
arguments_(std::move(arguments)),
- partition_by_attributes_(std::move(partition_by_attributes)),
is_row_(is_row),
num_preceding_(num_preceding),
num_following_(num_following),
storage_manager_(storage_manager) {
// Get the Types of this window aggregate's arguments so that we can create an
// AggregationHandle.
- // TODO(Shixuan): Next step: New handles for window aggregation function.
std::vector<const Type*> argument_types;
for (const std::unique_ptr<const Scalar> &argument : arguments_) {
argument_types.emplace_back(&argument->getType());
@@ -71,28 +76,18 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Check if window aggregate function could apply to the arguments.
DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+ // IDs and types of partition keys.
+ std::vector<const Type*> partition_by_types;
+ for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+ partition_by_ids_.push_back(
+ partition_by_attribute->getAttributeIdForValueAccessor());
+ partition_by_types.push_back(&partition_by_attribute->getType());
+ }
+
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(argument_types));
- window_aggregation_state_.reset(
- window_aggregation_handle_->createInitialState());
-
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // See if all of this window aggregate's arguments are attributes in the input
- // relation. If so, remember the attribute IDs so that we can do copy elision
- // when actually performing the window aggregation.
- arguments_as_attributes_.reserve(arguments_.size());
- for (const std::unique_ptr<const Scalar> &argument : arguments_) {
- const attribute_id argument_id = argument->getAttributeIdForValueAccessor();
- if (argument_id == -1) {
- arguments_as_attributes_.clear();
- break;
- } else {
- DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
- arguments_as_attributes_.push_back(argument_id);
- }
- }
-#endif
+ window_aggregate_function->createHandle(std::move(argument_types),
+ std::move(partition_by_types)));
}
WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFromProto(
@@ -101,10 +96,6 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto, database));
- // Rebuild contructor arguments from their representation in 'proto'.
- const AggregateFunction *aggregate_function
- = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
-
std::vector<std::unique_ptr<const Scalar>> arguments;
arguments.reserve(proto.arguments_size());
for (int argument_idx = 0; argument_idx < proto.arguments_size(); ++argument_idx) {
@@ -126,10 +117,10 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
const std::int64_t num_preceding = proto.num_preceding();
const std::int64_t num_following = proto.num_following();
- return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
- aggregate_function,
+ return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+ &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function()),
std::move(arguments),
- std::move(partition_by_attributes),
+ partition_by_attributes,
is_row,
num_preceding,
num_following,
@@ -139,11 +130,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
const CatalogDatabaseLite &database) {
if (!proto.IsInitialized() ||
- !database.hasRelationWithId(proto.relation_id())) {
+ !database.hasRelationWithId(proto.input_relation_id())) {
return false;
}
- if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+ if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) {
return false;
}
@@ -176,4 +167,122 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
return true;
}
+void WindowAggregationOperationState::windowAggregateBlocks(
+ InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids) {
+ // TODO(Shixuan): This is a quick fix for currently unsupported functions in
+ // order to pass the query_optimizer test.
+ if (window_aggregation_handle_.get() == nullptr) {
+ std::cout << "The function will be supported in the near future :)\n";
+ return;
+ }
+
+ // TODO(Shixuan): RANGE frame mode should be supported to make SQL grammar
+ // work. This will need Order Key to be passed so that we know where the
+ // window should start and end.
+ if (!is_row_) {
+ std::cout << "Currently we don't support RANGE frame mode :(\n";
+ return;
+ }
+
+ // Get the total number of tuples.
+ int num_tuples = 0;
+ for (const block_id block_idx : block_ids) {
+ num_tuples +=
+ storage_manager_->getBlock(block_idx, input_relation_)->getNumTuples();
+ }
+
+ // Construct column vectors for attributes.
+ std::vector<ColumnVector*> attribute_vecs;
+ for (std::size_t attr_id = 0; attr_id < input_relation_.size(); ++attr_id) {
+ const CatalogAttribute *attr = input_relation_.getAttributeById(attr_id);
+ const Type &type = attr->getType();
+
+ if (NativeColumnVector::UsableForType(type)) {
+ attribute_vecs.push_back(new NativeColumnVector(type, num_tuples));
+ } else {
+ attribute_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+ }
+ }
+
+ // Construct column vectors for arguments.
+ std::vector<ColumnVector*> argument_vecs;
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ const Type &type = argument->getType();
+
+ if (NativeColumnVector::UsableForType(type)) {
+ argument_vecs.push_back(new NativeColumnVector(type, num_tuples));
+ } else {
+ argument_vecs.push_back(new IndirectColumnVector(type, num_tuples));
+ }
+ }
+
+ // TODO(Shixuan): Add Support for Vector Copy Elision Selection.
+ // Add tuples and arguments into ColumnVectors.
+ for (const block_id block_idx : block_ids) {
+ BlockReference block = storage_manager_->getBlock(block_idx, input_relation_);
+ const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+ SubBlocksReference sub_block_ref(tuple_block,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+ ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
+ for (const std::unique_ptr<const Scalar> &argument : arguments_) {
+ argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
+ &sub_block_ref));
+ }
+
+ InvokeOnAnyValueAccessor(tuple_accessor,
+ [&] (auto *tuple_accessor) -> void { // NOLINT(build/c++11)
+ tuple_accessor->beginIteration();
+ argument_accessor->beginIteration();
+
+ while (tuple_accessor->next() && argument_accessor->next()) {
+ for (std::size_t attr_id = 0; attr_id < attribute_vecs.size(); ++attr_id) {
+ ColumnVector *attr_vec = attribute_vecs[attr_id];
+ if (attr_vec->isNative()) {
+ static_cast<NativeColumnVector*>(attr_vec)->appendTypedValue(
+ tuple_accessor->getTypedValue(attr_id));
+ } else {
+ static_cast<IndirectColumnVector*>(attr_vec)->appendTypedValue(
+ tuple_accessor->getTypedValue(attr_id));
+ }
+ }
+
+ for (std::size_t argument_idx = 0;
+ argument_idx < argument_vecs.size();
+ ++argument_idx) {
+ ColumnVector *argument = argument_vecs[argument_idx];
+ if (argument->isNative()) {
+ static_cast<NativeColumnVector*>(argument)->appendTypedValue(
+ argument_accessor->getTypedValue(argument_idx));
+ } else {
+ static_cast<IndirectColumnVector*>(argument)->appendTypedValue(
+ argument_accessor->getTypedValue(argument_idx));
+ }
+ }
+ }
+ });
+ }
+
+ // Construct the value accessor for tuples in all blocks
+ ColumnVectorsValueAccessor *all_blocks_accessor
+ = new ColumnVectorsValueAccessor();
+ for (ColumnVector *attr_vec : attribute_vecs) {
+ all_blocks_accessor->addColumn(attr_vec);
+ }
+
+ // Do actual calculation in handle.
+ ColumnVector *window_aggregates =
+ window_aggregation_handle_->calculate(all_blocks_accessor,
+ std::move(argument_vecs),
+ partition_by_ids_,
+ is_row_,
+ num_preceding_,
+ num_following_);
+
+ all_blocks_accessor->addColumn(window_aggregates);
+ output_destination->bulkInsertTuples(all_blocks_accessor);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index d7b3e6a..9792a99 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -25,20 +25,20 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
#include "utility/Macros.hpp"
namespace quickstep {
-class AggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
+class WindowAggregateFunction;
/** \addtogroup Storage
* @{
@@ -63,13 +63,12 @@ class WindowAggregationOperationState {
* current row. -1 means UNBOUNDED PRECEDING.
* @param num_following The number of rows/range for the tuples following the
* current row. -1 means UNBOUNDED FOLLOWING.
- * @param storage_manager The StorageManager to use for allocating hash
- * tables.
+ * @param storage_manager The StorageManager to get block references.
*/
WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
- const AggregateFunction *window_aggregate_function,
+ const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
- std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
+ const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
@@ -107,66 +106,29 @@ class WindowAggregationOperationState {
const CatalogDatabaseLite &database);
/**
- * @brief Get the is_row info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return True if the frame mode is ROW, false if it is RANGE.
- **/
- const bool is_row() const { return is_row_; }
-
- /**
- * @brief Get the num_preceding info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return The number of rows/range that precedes the current row.
- **/
- const std::int64_t num_preceding() const { return num_preceding_; }
-
- /**
- * @brief Get the num_following info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
+ * @brief Compute window aggregates on the tuples of the given relation.
*
- * @return The number of rows/range that follows the current row.
+ * @param output_destination The output destination for the computed window
+ * aggregate.
+ * @param block_ids The id of the blocks to be computed.
**/
- const std::int64_t num_following() const { return num_following_; }
-
- /**
- * @brief Get the pointer to StorageManager.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the storage manager.
- **/
- StorageManager *storage_manager() { return storage_manager_; }
+ void windowAggregateBlocks(InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids);
private:
const CatalogRelationSchema &input_relation_;
-
- // TODO(Shixuan): Handle and State for window aggregation will be needed for
- // actual calculation.
- std::unique_ptr<AggregationHandle> window_aggregation_handle_;
- std::unique_ptr<AggregationState> window_aggregation_state_;
+ const std::vector<block_id> block_ids_;
+ std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
std::vector<std::unique_ptr<const Scalar>> arguments_;
+ std::vector<attribute_id> partition_by_ids_;
- // We don't add order_by_attributes here since it is not needed after sorting.
- std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
-
- // Window framing information.
+ // Frame info.
const bool is_row_;
const std::int64_t num_preceding_;
const std::int64_t num_following_;
StorageManager *storage_manager_;
-#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
- // If all an aggregate's argument expressions are simply attributes in
- // 'input_relation_', then this caches the attribute IDs of those arguments.
- std::vector<attribute_id> arguments_as_attributes_;
-#endif
-
DISALLOW_COPY_AND_ASSIGN(WindowAggregationOperationState);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c7bd0ef..d888461 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -19,12 +19,12 @@ syntax = "proto2";
package quickstep.serialization;
-import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/window_aggregation/WindowAggregateFunction.proto";
import "expressions/Expressions.proto";
message WindowAggregationOperationState {
- required int32 relation_id = 1;
- required AggregateFunction function = 2;
+ required int32 input_relation_id = 1;
+ required WindowAggregateFunction function = 2;
repeated Scalar arguments = 3;
repeated Scalar partition_by_attributes = 4;
required bool is_row = 5;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c0bb4620/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
index c572034..d58f0f5 100644
--- a/storage/tests/WindowAggregationOperationState_unittest.cpp
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -23,7 +23,7 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "storage/WindowAggregationOperationState.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
@@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(kInvalidTableId);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(kInvalidTableId);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kValidNum);
proto.set_num_following(kValidNum);
@@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(rel_id_);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(rel_id_);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kInvalidNum);
proto.set_num_following(kValidNum);
@@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(rel_id_);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(rel_id_);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kValidNum);
proto.set_num_following(kValidNum);