You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/11 14:12:20 UTC
[1/3] incubator-quickstep git commit: Added handle for avg
Repository: incubator-quickstep
Updated Branches:
refs/heads/SQL-window-aggregation [created] d5f535ee9
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index fb75767..9313e51 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -39,6 +39,8 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_expressions_tablegenerator_GeneratorFunction
quickstep_expressions_tablegenerator_GeneratorFunctionFactory
quickstep_expressions_tablegenerator_GeneratorFunctionHandle
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
quickstep_parser_ParseAssignment
quickstep_parser_ParseBasicExpressions
quickstep_parser_ParseBlockProperties
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index f10378b..11348fe 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -35,6 +35,8 @@
#include "expressions/table_generator/GeneratorFunction.hpp"
#include "expressions/table_generator/GeneratorFunctionFactory.hpp"
#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
#include "parser/ParseAssignment.hpp"
#include "parser/ParseBasicExpressions.hpp"
#include "parser/ParseBlockProperties.hpp"
@@ -2624,11 +2626,19 @@ E::ScalarPtr Resolver::resolveFunctionCall(
<< "COUNT aggregate has both star (*) and non-star arguments.";
}
- // Try to look up the AggregateFunction by name using
- // AggregateFunctionFactory.
- const ::quickstep::AggregateFunction *aggregate
- = AggregateFunctionFactory::GetByName(function_name);
- if (aggregate == nullptr) {
+ // Try to look up the AggregateFunction/WindowAggregationFunction by name.
+ // TODO(Shixuan): We might want to create a new abstract class Function to
+ // include both AggregateFunction and WindowAggregateFunction, which will make
+ // this part of code cleaner.
+ const ::quickstep::AggregateFunction *aggregate;
+ const ::quickstep::WindowAggregateFunction *window_aggregate;
+ if (parse_function_call.isWindow()) {
+ window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
+ } else {
+ aggregate = AggregateFunctionFactory::GetByName(function_name);
+ }
+
+ if (aggregate == nullptr && window_aggregate == nullptr) {
THROW_SQL_ERROR_AT(&parse_function_call)
<< "Unrecognized function name \""
<< parse_function_call.name()->value()
@@ -2656,11 +2666,12 @@ E::ScalarPtr Resolver::resolveFunctionCall(
}
// Make sure a naked COUNT() with no arguments wasn't specified.
- if ((aggregate->getAggregationID() == AggregationID::kCount)
- && (resolved_arguments.empty())
- && (!count_star)) {
- THROW_SQL_ERROR_AT(&parse_function_call)
- << "COUNT aggregate requires an argument (either scalar or star (*))";
+ if ((aggregate != nullptr && aggregate->getAggregationID() == AggregationID::kCount)
+ || (window_aggregate != nullptr && window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
+ if ((resolved_arguments.empty()) && !count_star) {
+ THROW_SQL_ERROR_AT(&parse_function_call)
+ << "COUNT function requires an argument (either scalar or star (*))";
+ }
}
// Resolve each of the Scalar arguments to the aggregate.
@@ -2670,16 +2681,17 @@ E::ScalarPtr Resolver::resolveFunctionCall(
}
// Make sure that the aggregate can apply to the specified argument(s).
- if (!aggregate->canApplyToTypes(argument_types)) {
+ if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
+ || (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
THROW_SQL_ERROR_AT(&parse_function_call)
- << "Aggregate function " << aggregate->getName()
+ << "Function " << aggregate->getName()
<< " can not apply to the given argument(s).";
}
if (parse_function_call.isWindow()) {
return resolveWindowAggregateFunction(parse_function_call,
expression_resolution_info,
- aggregate,
+ window_aggregate,
resolved_arguments);
}
@@ -2705,7 +2717,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
E::ScalarPtr Resolver::resolveWindowAggregateFunction(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info,
- const ::quickstep::AggregateFunction *window_aggregate,
+ const ::quickstep::WindowAggregateFunction *window_aggregate,
const std::vector<E::ScalarPtr> &resolved_arguments) {
// A window aggregate function might be defined OVER a window name or a window.
E::WindowAggregateFunctionPtr window_aggregate_function;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index f4024e9..373430c 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -23,7 +23,6 @@
#include <unordered_set>
#include <vector>
-#include "query_optimizer/expressions/AggregateFunction.hpp"
#include "query_optimizer/expressions/Alias.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
@@ -460,14 +459,14 @@ class Resolver {
* @param expression_resolution_info Resolution info that contains the name
* resolver and info to be updated after
* resolution.
- * @param aggregate The aggregate function.
+ * @param aggregate The window aggregate function.
* @param resolved_arguments The resolved arguments.
* @return An expression in the query optimizer.
*/
expressions::ScalarPtr resolveWindowAggregateFunction(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info,
- const ::quickstep::AggregateFunction *aggregate,
+ const ::quickstep::WindowAggregateFunction *aggregate,
const std::vector<expressions::ScalarPtr> &resolved_arguments);
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 249441d..a51370b 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -434,6 +434,7 @@ target_link_libraries(quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
+ quickstep_storage_WindowAggregationOperationState
quickstep_utility_Macros
tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 93cb9d4..ec2f27c 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -21,11 +21,13 @@
#include <vector>
+#include "catalog/CatalogRelation.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
+#include "storage/WindowAggregationOperationState.hpp"
#include "tmb/id_typedefs.h"
@@ -44,6 +46,7 @@ bool WindowAggregationOperator::getAllWorkOrders(
new WindowAggregationWorkOrder(
query_id_,
query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+ block_ids_,
query_context->getInsertDestination(output_destination_index_)),
op_index_);
generated_ = true;
@@ -67,6 +70,9 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
proto->set_query_id(query_id_);
proto->SetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index,
window_aggregation_state_index_);
+ for (block_id bid : block_ids_) {
+ proto->AddExtension(serialization::WindowAggregationWorkOrder::block_ids, bid);
+ }
proto->SetExtension(serialization::WindowAggregationWorkOrder::insert_destination_index,
output_destination_index_);
@@ -75,8 +81,8 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
void WindowAggregationWorkOrder::execute() {
- std::cout << "Window aggregation is not supported yet.\n"
- << "An empty table is returned\n";
+ state_->windowAggregateBlocks(output_destination_,
+ block_ids_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/WindowAggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.hpp b/relational_operators/WindowAggregationOperator.hpp
index f3dfd14..4084ffc 100644
--- a/relational_operators/WindowAggregationOperator.hpp
+++ b/relational_operators/WindowAggregationOperator.hpp
@@ -58,16 +58,19 @@ class WindowAggregationOperator : public RelationalOperator {
*
* @param query_id The ID of this query.
* @param input_relation The relation to perform aggregation over.
+ * @param output_relation The relation for output.
* @param window_aggregation_state_index The index of WindowAggregationState
* in QueryContext.
* @param output_destination_index The index of InsertDestination in
* QueryContext for the output.
**/
WindowAggregationOperator(const std::size_t query_id,
+ const CatalogRelation &input_relation,
const CatalogRelation &output_relation,
const QueryContext::window_aggregation_state_id window_aggregation_state_index,
const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
+ block_ids_(input_relation.getBlocksSnapshot()),
output_relation_(output_relation),
window_aggregation_state_index_(window_aggregation_state_index),
output_destination_index_(output_destination_index),
@@ -99,6 +102,7 @@ class WindowAggregationOperator : public RelationalOperator {
**/
serialization::WorkOrder* createWorkOrderProto();
+ const std::vector<block_id> block_ids_;
const CatalogRelation &output_relation_;
const QueryContext::window_aggregation_state_id window_aggregation_state_index_;
const QueryContext::insert_destination_id output_destination_index_;
@@ -121,39 +125,20 @@ class WindowAggregationWorkOrder : public WorkOrder {
**/
WindowAggregationWorkOrder(const std::size_t query_id,
WindowAggregationOperationState *state,
+ const std::vector<block_id> &block_ids,
InsertDestination *output_destination)
: WorkOrder(query_id),
state_(state),
+ block_ids_(block_ids),
output_destination_(output_destination) {}
~WindowAggregationWorkOrder() override {}
- /**
- * @brief Get the pointer to WindowAggregationOperationState.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the window aggregation operation state.
- **/
- WindowAggregationOperationState* state() {
- return state_;
- }
-
- /**
- * @brief Get the pointer to output destination.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return A pointer to the output destination.
- **/
- InsertDestination* output_destination() {
- return output_destination_;
- }
-
void execute() override;
private:
WindowAggregationOperationState *state_;
+ const std::vector<block_id> block_ids_;
InsertDestination *output_destination_;
DISALLOW_COPY_AND_ASSIGN(WindowAggregationWorkOrder);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 69dee1b..076735f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -249,6 +249,7 @@ message WindowAggregationWorkOrder {
extend WorkOrder {
// All required
optional uint32 window_aggr_state_index = 336;
- optional int32 insert_destination_index = 337;
+ repeated fixed64 block_ids = 337;
+ optional int32 insert_destination_index = 338;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 9df66e1..a0bb8ea 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -1053,19 +1053,20 @@ target_link_libraries(quickstep_storage_WindowAggregationOperationState
quickstep_catalog_CatalogTypedefs
quickstep_expressions_ExpressionFactories
quickstep_expressions_Expressions_proto
- quickstep_expressions_aggregation_AggregateFunction
- quickstep_expressions_aggregation_AggregateFunctionFactory
- quickstep_expressions_aggregation_AggregationHandle
- quickstep_expressions_aggregation_AggregationID
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_expressions_windowaggregation_WindowAggregationID
quickstep_storage_StorageBlockInfo
+ quickstep_storage_InsertDestination
quickstep_storage_StorageManager
quickstep_storage_WindowAggregationOperationState_proto
quickstep_utility_Macros)
target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
- quickstep_expressions_aggregation_AggregateFunction_proto
quickstep_expressions_Expressions_proto
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
${PROTOBUF_LIBRARY})
# Module all-in-one library:
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index a0bcc37..8d05b79 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -31,12 +31,13 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/ExpressionFactories.hpp"
#include "expressions/Expressions.pb.h"
-#include "expressions/aggregation/AggregateFunction.hpp"
-#include "expressions/aggregation/AggregateFunctionFactory.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
-#include "expressions/aggregation/AggregationID.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/InsertDestination.hpp"
#include "storage/StorageManager.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
@@ -46,7 +47,7 @@ namespace quickstep {
WindowAggregationOperationState::WindowAggregationOperationState(
const CatalogRelationSchema &input_relation,
- const AggregateFunction *window_aggregate_function,
+ const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
const bool is_row,
@@ -55,7 +56,6 @@ WindowAggregationOperationState::WindowAggregationOperationState(
StorageManager *storage_manager)
: input_relation_(input_relation),
arguments_(std::move(arguments)),
- partition_by_attributes_(std::move(partition_by_attributes)),
is_row_(is_row),
num_preceding_(num_preceding),
num_following_(num_following),
@@ -71,11 +71,19 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Check if window aggregate function could apply to the arguments.
DCHECK(window_aggregate_function->canApplyToTypes(argument_types));
+ // IDs and types of partition keys.
+ std::vector<attribute_id> partition_by_ids;
+ std::vector<const Type*> partition_by_types;
+ for (const std::unique_ptr<const Scalar> &partition_by_attribute : partition_by_attributes) {
+ partition_by_ids.push_back(
+ partition_by_attribute->getAttributeIdForValueAccessor());
+ partition_by_types.push_back(&partition_by_attribute->getType());
+ }
+
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(argument_types));
- window_aggregation_state_.reset(
- window_aggregation_handle_->createInitialState());
+ window_aggregate_function->createHandle(std::move(argument_types),
+ std::move(partition_by_types)));
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
// See if all of this window aggregate's arguments are attributes in the input
@@ -88,7 +96,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
arguments_as_attributes_.clear();
break;
} else {
- DCHECK_EQ(input_relation_.getID(), argument->getRelationIdForValueAccessor());
+ DCHECK_EQ(input_relation.getID(), argument->getRelationIdForValueAccessor());
arguments_as_attributes_.push_back(argument_id);
}
}
@@ -102,8 +110,8 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
DCHECK(ProtoIsValid(proto, database));
// Rebuild contructor arguments from their representation in 'proto'.
- const AggregateFunction *aggregate_function
- = &AggregateFunctionFactory::ReconstructFromProto(proto.function());
+ const WindowAggregateFunction *window_aggregate_function
+ = &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
std::vector<std::unique_ptr<const Scalar>> arguments;
arguments.reserve(proto.arguments_size());
@@ -126,8 +134,8 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
const std::int64_t num_preceding = proto.num_preceding();
const std::int64_t num_following = proto.num_following();
- return new WindowAggregationOperationState(database.getRelationSchemaById(proto.relation_id()),
- aggregate_function,
+ return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+ window_aggregate_function,
std::move(arguments),
std::move(partition_by_attributes),
is_row,
@@ -139,11 +147,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAggregationOperationState &proto,
const CatalogDatabaseLite &database) {
if (!proto.IsInitialized() ||
- !database.hasRelationWithId(proto.relation_id())) {
+ !database.hasRelationWithId(proto.input_relation_id())) {
return false;
}
- if (!AggregateFunctionFactory::ProtoIsValid(proto.function())) {
+ if (!WindowAggregateFunctionFactory::ProtoIsValid(proto.function())) {
return false;
}
@@ -176,4 +184,18 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
return true;
}
+void WindowAggregationOperationState::windowAggregateBlocks(
+ InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids) {
+ window_aggregation_handle_->calculate(arguments_,
+ block_ids,
+ partition_by_ids_,
+ input_relation_,
+ is_row_,
+ num_preceding_,
+ num_following_,
+ storage_manager_,
+ output_destination);
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index d7b3e6a..f38dbd8 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -25,7 +25,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -34,7 +34,7 @@
namespace quickstep {
-class AggregateFunction;
+class WindowAggregateFunction;
class CatalogDatabaseLite;
class CatalogRelationSchema;
class InsertDestination;
@@ -67,7 +67,7 @@ class WindowAggregationOperationState {
* tables.
*/
WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
- const AggregateFunction *window_aggregate_function,
+ const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
const bool is_row,
@@ -107,54 +107,25 @@ class WindowAggregationOperationState {
const CatalogDatabaseLite &database);
/**
- * @brief Get the is_row info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return True if the frame mode is ROW, false if it is RANGE.
- **/
- const bool is_row() const { return is_row_; }
-
- /**
- * @brief Get the num_preceding info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return The number of rows/range that precedes the current row.
- **/
- const std::int64_t num_preceding() const { return num_preceding_; }
-
- /**
- * @brief Get the num_following info.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
- *
- * @return The number of rows/range that follows the current row.
- **/
- const std::int64_t num_following() const { return num_following_; }
-
- /**
- * @brief Get the pointer to StorageManager.
- * @note This is a quickfix for "unused variable". After the window aggregate
- * functions are built, these methods might be dropped.
+ * @brief Compute window aggregates on the tuples of the given relation.
*
- * @return A pointer to the storage manager.
+ * @param output_destination The output destination for the computed window
+ * aggregate.
**/
- StorageManager *storage_manager() { return storage_manager_; }
+ void windowAggregateBlocks(InsertDestination *output_destination,
+ const std::vector<block_id> &block_ids);
private:
const CatalogRelationSchema &input_relation_;
// TODO(Shixuan): Handle and State for window aggregation will be needed for
// actual calculation.
- std::unique_ptr<AggregationHandle> window_aggregation_handle_;
- std::unique_ptr<AggregationState> window_aggregation_state_;
+ std::unique_ptr<WindowAggregationHandle> window_aggregation_handle_;
+
std::vector<std::unique_ptr<const Scalar>> arguments_;
+ std::vector<attribute_id> partition_by_ids_;
- // We don't add order_by_attributes here since it is not needed after sorting.
- std::vector<std::unique_ptr<const Scalar>> partition_by_attributes_;
-
- // Window framing information.
+ // Frame info.
const bool is_row_;
const std::int64_t num_preceding_;
const std::int64_t num_following_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index c7bd0ef..4dc0a6a 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -19,15 +19,15 @@ syntax = "proto2";
package quickstep.serialization;
-import "expressions/aggregation/AggregateFunction.proto";
+import "expressions/window_aggregation/WindowAggregateFunction.proto";
import "expressions/Expressions.proto";
message WindowAggregationOperationState {
- required int32 relation_id = 1;
- required AggregateFunction function = 2;
- repeated Scalar arguments = 3;
- repeated Scalar partition_by_attributes = 4;
- required bool is_row = 5;
- required int64 num_preceding = 6; // -1 means UNBOUNDED PRECEDING.
- required int64 num_following = 7; // -1 means UNBOUNDED FOLLOWING.
+ required int32 input_relation_id = 1;
+ required WindowAggregateFunction function = 3;
+ repeated Scalar arguments = 4;
+ repeated Scalar partition_by_attributes = 5;
+ required bool is_row = 6;
+ required int64 num_preceding = 7; // -1 means UNBOUNDED PRECEDING.
+ required int64 num_following = 8; // -1 means UNBOUNDED FOLLOWING.
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/storage/tests/WindowAggregationOperationState_unittest.cpp
----------------------------------------------------------------------
diff --git a/storage/tests/WindowAggregationOperationState_unittest.cpp b/storage/tests/WindowAggregationOperationState_unittest.cpp
index c572034..d58f0f5 100644
--- a/storage/tests/WindowAggregationOperationState_unittest.cpp
+++ b/storage/tests/WindowAggregationOperationState_unittest.cpp
@@ -23,7 +23,7 @@
#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
-#include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "storage/WindowAggregationOperationState.hpp"
#include "storage/WindowAggregationOperationState.pb.h"
@@ -57,8 +57,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, UninitializationTest) {
TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(kInvalidTableId);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(kInvalidTableId);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kValidNum);
proto.set_num_following(kValidNum);
@@ -67,8 +67,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidRelationIdTest) {
TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(rel_id_);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(rel_id_);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kInvalidNum);
proto.set_num_following(kValidNum);
@@ -81,8 +81,8 @@ TEST_F(WindowAggregationOperationStateProtoTest, InvalidNumTest) {
TEST_F(WindowAggregationOperationStateProtoTest, ValidTest) {
serialization::WindowAggregationOperationState proto;
- proto.set_relation_id(rel_id_);
- proto.mutable_function()->set_aggregation_id(serialization::AggregateFunction::AVG);
+ proto.set_input_relation_id(rel_id_);
+ proto.mutable_function()->set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
proto.set_is_row(true);
proto.set_num_preceding(kValidNum);
proto.set_num_following(kValidNum);
[3/3] incubator-quickstep git commit: Seperated calculation into two
parts to check intermediate result
Posted by sh...@apache.org.
Seperated calculation into two parts to check intermediate result
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d5f535ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d5f535ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d5f535ee
Branch: refs/heads/SQL-window-aggregation
Commit: d5f535ee9f0e3c7a8ea7615a07699b83c5f905f4
Parents: 2966405
Author: shixuan-fan <sh...@apache.org>
Authored: Mon Jul 11 14:11:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Mon Jul 11 14:11:47 2016 +0000
----------------------------------------------------------------------
.../WindowAggregateFunction.hpp | 4 +
.../WindowAggregateFunctionAvg.cpp | 6 +-
.../WindowAggregateFunctionAvg.hpp | 2 +
.../WindowAggregationHandle.hpp | 16 +-
.../WindowAggregationHandleAvg.cpp | 78 ++++++----
.../WindowAggregationHandleAvg.hpp | 11 +-
.../WindowAggregationHandleAvg_unittest.cpp | 148 ++++++++++++++++++-
query_optimizer/ExecutionGenerator.cpp | 5 +
query_optimizer/resolver/Resolver.cpp | 8 +-
.../WindowAggregationOperator.cpp | 3 +-
storage/WindowAggregationOperationState.cpp | 25 +++-
storage/WindowAggregationOperationState.hpp | 5 +-
storage/WindowAggregationOperationState.proto | 1 +
13 files changed, 253 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
index 9cc5d74..84d97fc 100644
--- a/expressions/window_aggregation/WindowAggregateFunction.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -26,10 +26,12 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"
namespace quickstep {
+class CatalogRelationSchema;
class WindowAggregationHandle;
class Type;
@@ -128,6 +130,8 @@ class WindowAggregateFunction {
* is responsible for deleting the returned object.
**/
virtual WindowAggregationHandle* createHandle(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const = 0;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
index e9a4453..06ff1d9 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -72,13 +72,17 @@ const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
}
WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const {
DCHECK(canApplyToTypes(argument_types))
<< "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
<< " that AVG can not be applied to.";
- return new WindowAggregationHandleAvg(*argument_types.front(),
+ return new WindowAggregationHandleAvg(relation,
+ block_ids,
+ *argument_types.front(),
std::move(partition_key_types));
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
index 18e1022..91acf7e 100644
--- a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -57,6 +57,8 @@ class WindowAggregateFunctionAvg : public WindowAggregateFunction {
const std::vector<const Type*> &argument_types) const override;
WindowAggregationHandle* createHandle(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids,
std::vector<const Type*> &&argument_types,
std::vector<const Type*> &&partition_key_types) const override;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
index 77b1e76..6b7988a 100644
--- a/expressions/window_aggregation/WindowAggregationHandle.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -99,14 +99,13 @@ class WindowAggregationHandle {
* @param output_destination The destination for output.
**/
virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<block_id> &block_ids,
const std::vector<attribute_id> &partition_by_ids,
- const CatalogRelationSchema &relation,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
- StorageManager *storage_manager,
- InsertDestinationInterface *output_destination) const = 0;
+ StorageManager *storage_manager) = 0;
+
+ virtual std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager) = 0;
protected:
/**
@@ -119,7 +118,14 @@ class WindowAggregationHandle {
* @param num_following The number of rows/range that follows the current row.
* @param storage_manager A pointer to the storage manager.
**/
- WindowAggregationHandle() {}
+ WindowAggregationHandle(const CatalogRelationSchema &relation,
+ const std::vector<block_id> block_ids)
+ : block_ids_(block_ids),
+ relation_(relation) {}
+
+ std::vector<ColumnVector*> window_aggregates_;
+ const std::vector<block_id> block_ids_;
+ const CatalogRelationSchema &relation_;
private:
DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
index 62f5a88..e6e2894 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -25,6 +25,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/scalar/Scalar.hpp"
+#include "expressions/scalar/ScalarAttribute.hpp"
#include "storage/InsertDestinationInterface.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageManager.hpp"
@@ -51,9 +52,12 @@ namespace quickstep {
class StorageManager;
WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+ const CatalogRelationSchema &relation,
+ const std::vector<block_id> &block_ids,
const Type &type,
std::vector<const Type*> &&partition_key_types)
- : argument_type_(type) {
+ : WindowAggregationHandle(relation, block_ids),
+ argument_type_(type) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_id;
@@ -98,24 +102,20 @@ WindowAggregationHandleAvg::WindowAggregationHandleAvg(
}
void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<block_id> &block_ids,
const std::vector<attribute_id> &partition_by_ids,
- const CatalogRelationSchema &relation,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
- StorageManager *storage_manager,
- InsertDestinationInterface *output_destination) const {
+ StorageManager *storage_manager) {
DCHECK(arguments.size() == 1);
- DCHECK(!block_ids.empty());
// Initialize the tuple accessors and argument accessors.
// Index of each value accessor indicates the block it belongs to.
std::vector<ValueAccessor*> tuple_accessors;
std::vector<ColumnVectorsValueAccessor*> argument_accessors;
- for (block_id bid : block_ids) {
+ for (block_id bid : block_ids_) {
// Get tuple accessor.
- BlockReference block = storage_manager->getBlock(bid, relation);
+ BlockReference block = storage_manager->getBlock(bid, relation_);
const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
tuple_accessors.push_back(tuple_accessor);
@@ -132,12 +132,14 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
// Create a window for each tuple and calculate the window aggregate.
for (std::uint32_t current_block_index = 0;
- current_block_index < block_ids.size();
+ current_block_index < block_ids_.size();
++current_block_index) {
ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
ColumnVectorsValueAccessor* argument_accessor =
argument_accessors[current_block_index];
-
+ NativeColumnVector window_aggregates_for_block(*result_type_,
+ argument_accessor->getNumTuples());
+
InvokeOnAnyValueAccessor (
tuple_accessor,
[&] (auto *tuple_accessor) -> void {
@@ -145,24 +147,48 @@ void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<con
argument_accessor->beginIteration();
while (tuple_accessor->next() && argument_accessor->next()) {
- TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
- argument_accessors,
- partition_by_ids,
- current_block_index,
- is_row,
- num_preceding,
- num_following);
- Tuple *current_tuple = tuple_accessor->getTuple();
- std::vector<TypedValue> new_tuple;
- for (TypedValue value : *current_tuple) {
- new_tuple.push_back(value);
- }
-
- new_tuple.push_back(window_aggregate);
- output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+ const TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+ argument_accessors,
+ partition_by_ids,
+ current_block_index,
+ is_row,
+ num_preceding,
+ num_following);
+ window_aggregates_for_block.appendTypedValue(window_aggregate);
}
});
+
+ window_aggregates_.push_back(&window_aggregates_for_block);
+ }
+}
+
+std::vector<ValueAccessor*>&& WindowAggregationHandleAvg::finalize(
+ StorageManager *storage_manager) {
+ std::vector<ValueAccessor*> accessors;
+
+ // Create a ValueAccessor for each block, including the new window aggregate
+ // attribute.
+ for (std::size_t block_idx = 0; block_idx < block_ids_.size(); ++block_idx) {
+ // Get the block information.
+ BlockReference block = storage_manager->getBlock(block_idx, relation_);
+ const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+ ValueAccessor *block_accessor = tuple_block.createValueAccessor();
+ SubBlocksReference sub_block_ref(tuple_block,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ ColumnVectorsValueAccessor accessor;
+
+ for (CatalogRelationSchema::const_iterator attr_it = relation_.begin();
+ attr_it != relation_.end();
+ ++attr_it) {
+ ScalarAttribute scalar_attr(*attr_it);
+ accessor.addColumn(scalar_attr.getAllValues(block_accessor, &sub_block_ref));
+ }
+
+ accessors.push_back(&accessor);
}
+
+ return std::move(accessors);
}
TypedValue WindowAggregationHandleAvg::calculateOneWindow(
@@ -287,7 +313,7 @@ bool WindowAggregationHandleAvg::samePartition(
const std::vector<attribute_id> &partition_by_ids) const {
return InvokeOnAnyValueAccessor (tuple_accessor,
[&] (auto *tuple_accessor) -> bool {
- for (std::uint32_t partition_by_index = 0;
+ for (std::size_t partition_by_index = 0;
partition_by_index < partition_by_ids.size();
++partition_by_index) {
if (!equal_comparators_[partition_by_index]->compareTypedValues(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
index 115152e..e0ec766 100644
--- a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -56,14 +56,13 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
~WindowAggregationHandleAvg() override {}
void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
- const std::vector<block_id> &block_ids,
const std::vector<attribute_id> &partition_by_ids,
- const CatalogRelationSchema &relation,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
- StorageManager *storage_manager,
- InsertDestinationInterface *output_destination) const;
+ StorageManager *storage_manager);
+
+ std::vector<ValueAccessor*>&& finalize(StorageManager *storage_manager);
private:
friend class WindowAggregateFunctionAvg;
@@ -79,7 +78,9 @@ class WindowAggregationHandleAvg : public WindowAggregationHandle {
* @param storage_manager A pointer to the storage manager.
* @param type Type of the avg value.
**/
- explicit WindowAggregationHandleAvg(const Type &type,
+ explicit WindowAggregationHandleAvg(const CatalogRelationSchema &relation,
+ const std::vector<block_id> &block_ids,
+ const Type &type,
std::vector<const Type*> &&partition_key_types);
TypedValue calculateOneWindow(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
index 6a7d161..326afa7 100644
--- a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -51,12 +51,142 @@ namespace quickstep {
namespace {
- constexpr int kNumSamples = 100;
+ constexpr int kNumTuplesPerBlock = 100;
+ constexpr int kNumBlocks = 5;
+ constexpr int kNumTuplesPerPartition = 8;
} // namespace
-class WindowAggregationHandleAvgTest : public::testing::Test {
+// Attribute value could be null if set true.
+class WindowAggregationHandleAvgTest : public::testing::TestWithParam<bool> {
protected:
+ virtual void SetUp() {
+ // Initialize relation and storage manager.
+ relation_.reset(new CatalogRelation(NULL, "TestRelation", kRelationId));
+ storage_manager_.reset(new StorageManager("TestAvg"));
+
+ // Add All kinds of TypedValues.
+ CatalogAttribute *int_attr = new CatalogAttribute(relation_.get(),
+ "int_attr",
+ TypeFactory::GetType(kInt, GetParam()));
+
+ relation_->addAttribute(int_attr);
+
+ CatalogAttribute *float_attr = new CatalogAttribute(relation_.get(),
+ "float_attr",
+ TypeFactory::GetType(kFloat, GetParam()));
+ relation_->addAttribute(float_attr);
+
+ CatalogAttribute *long_attr = new CatalogAttribute(relation_.get(),
+ "long_attr",
+ TypeFactory::GetType(kLong, GetParam()));
+ relation_->addAttribute(long_attr);
+
+ CatalogAttribute *double_attr = new CatalogAttribute(relation_.get(),
+ "double_attr",
+ TypeFactory::GetType(kDouble, GetParam()));
+ relation_->addAttribute(double_attr);
+
+ CatalogAttribute *char_attr = new CatalogAttribute(relation_.get(),
+ "char_attr",
+ TypeFactory::GetType(kChar, 4, GetParam()));
+ relation_->addAttribute(char_attr);
+
+ CatalogAttribute *varchar_attr = new CatalogAttribute(relation_.get(),
+ "varchar_attr",
+ TypeFactory::GetType(kVarChar, 32, GetParam()));
+ relation_->addAttribute(varchar_attr);
+
+ // Records the 'base_value' of a tuple used in createSampleTuple.
+ CatalogAttribute *partition_value = new CatalogAttribute(relation_.get(),
+ "partition_value",
+ TypeFactory::GetType(kInt, false));
+ relation_->addAttribute(partition_value);
+
+ StorageBlockLayout *layout = StorageBlockLayout::GenerateDefaultLayout(*relation_, true);
+
+ // Initialize blocks.
+ for (int i = 0; i < kNumBlocks; ++i) {
+ block_id bid = storage_manager_->createBlock(relation_, layout);
+ relation_->addBlock(bid);
+ insertTuples(bid);
+ }
+ }
+
+ // Insert kNumTuplesPerBlock tuples into the block.
+ void insertTuples(block_id bid) {
+ MutableBlockReference block = storage_manager_->getBlockMutable(bid, relation_);
+ for (int i = 0; i < kNumTuplesPerBlock; ++i) {
+ Tuple *tuple = createTuple(bid * kNumTuplesPerBlock + i);
+ block->insertTuple(*tuple);
+ }
+ }
+
+ Tuple* createTuple(int base_value) {
+ std::vector<TypedValue> attrs;
+
+ // int_attr.
+ if (GetParam() && base_value % 10 == 0) {
+ // Throw in a NULL integer for every ten values.
+ attrs.emplace_back(kInt);
+ } else {
+ attrs.emplace_back(base_value);
+ }
+
+ // float_attr.
+ if (GetParam() && base_value % 10 == 1) {
+ attrs.emplace_back(kFloat);
+ } else {
+ attrs.emplace_back(static_cast<float>(0.4 * base_value));
+ }
+
+ // long_attr.
+ if (GetParam() && base_value % 10 == 2) {
+ attrs.emplace_back(kLong);
+ } else {
+ attrs.emplace_back(static_cast<std::int64_t>(base_value));
+ }
+
+ // double_attr.
+ if (GetParam() && base_value % 10 == 3) {
+ attrs.emplace_back(kDouble);
+ } else {
+ attrs.emplace_back(static_cast<double>(0.25 * base_value));
+ }
+
+ // char_attr
+ if (GetParam() && base_value % 10 == 4) {
+ attrs.emplace_back(CharType::InstanceNullable(4).makeNullValue());
+ } else {
+ std::ostringstream char_buffer;
+ char_buffer << base_value;
+ std::string string_literal(char_buffer.str());
+ attrs.emplace_back(CharType::InstanceNonNullable(4).makeValue(
+ string_literal.c_str(),
+ string_literal.size() > 3 ? 4
+ : string_literal.size() + 1));
+ attrs.back().ensureNotReference();
+ }
+
+ // varchar_attr
+ if (GetParam() && base_value % 10 == 5) {
+ attrs.emplace_back(VarCharType::InstanceNullable(32).makeNullValue());
+ } else {
+ std::ostringstream char_buffer;
+ char_buffer << "Here are some numbers: " << base_value;
+ std::string string_literal(char_buffer.str());
+ attrs.emplace_back(VarCharType::InstanceNonNullable(32).makeValue(
+ string_literal.c_str(),
+ string_literal.size() + 1));
+ attrs.back().ensureNotReference();
+ }
+
+ // base_value
+ attrs.emplace_back(base_value / kNumTuplesPerPartition);
+ return new Tuple(std::move(attrs));
+}
+ }
+
// Handle initialization.
void initializeHandle(const Type &argument_type,
const std::vector<const Type*> &partition_key_types) {
@@ -86,11 +216,13 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
}
template <typename CppType>
- static void CheckAvgValue(
- CppType expected,
- const AggregationHandle &handle,
- const AggregationState &state) {
- EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+ static void CheckAvgValues(
+ std::vector<CppType> expected,
+ const ColumnVector* actual) {
+ EXPECT_EQ(expected.size(), actual->size());
+ for (std::size_t i = 0; i < expected.size(); ++i) {
+ EXPECT_EQ(expected[i], actual->getTypedValue(i).getLiteral<CppType>());
+ }
}
// Static templated method for set a meaningful value to data types.
@@ -237,6 +369,8 @@ class WindowAggregationHandleAvgTest : public::testing::Test {
std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
std::unique_ptr<StorageManager> storage_manager_;
+ std::unique_ptr<CatalogRelation> relation_;
+ std::vector<block_id> block_ids_;
};
const int AggregationHandleAvgTest::kNumSamples;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index ce21ade..06d47d2 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -1656,6 +1656,11 @@ void ExecutionGenerator::convertWindowAggregate(
findRelationInfoOutputByPhysical(physical_plan->input());
window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
+ // Get relation blocks.
+ for (block_id bid : input_relation_info->relation->getBlocksSnapshot()) {
+ window_aggr_state_proto->add_block_ids(bid);
+ }
+
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
physical_plan->window_aggregate_expression();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index 11348fe..d28213c 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -2630,8 +2630,8 @@ E::ScalarPtr Resolver::resolveFunctionCall(
// TODO(Shixuan): We might want to create a new abstract class Function to
// include both AggregateFunction and WindowAggregateFunction, which will make
// this part of code cleaner.
- const ::quickstep::AggregateFunction *aggregate;
- const ::quickstep::WindowAggregateFunction *window_aggregate;
+ const ::quickstep::AggregateFunction *aggregate = nullptr;
+ const ::quickstep::WindowAggregateFunction *window_aggregate = nullptr;
if (parse_function_call.isWindow()) {
window_aggregate = WindowAggregateFunctionFactory::GetByName(function_name);
} else {
@@ -2670,7 +2670,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
|| (window_aggregate != nullptr && window_aggregate->getWindowAggregationID() == WindowAggregationID::kCount)) {
if ((resolved_arguments.empty()) && !count_star) {
THROW_SQL_ERROR_AT(&parse_function_call)
- << "COUNT function requires an argument (either scalar or star (*))";
+ << "COUNT aggregate requires an argument (either scalar or star (*))";
}
}
@@ -2684,7 +2684,7 @@ E::ScalarPtr Resolver::resolveFunctionCall(
if ((aggregate != nullptr && !aggregate->canApplyToTypes(argument_types))
|| (window_aggregate != nullptr && !window_aggregate->canApplyToTypes(argument_types))) {
THROW_SQL_ERROR_AT(&parse_function_call)
- << "Function " << aggregate->getName()
+ << "Aggregate function " << aggregate->getName()
<< " can not apply to the given argument(s).";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/relational_operators/WindowAggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index ec2f27c..4c2e2b5 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -81,8 +81,7 @@ serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
void WindowAggregationWorkOrder::execute() {
- state_->windowAggregateBlocks(output_destination_,
- block_ids_);
+ state_->windowAggregateBlocks(output_destination_);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.cpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 8d05b79..9029bd4 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -47,6 +47,7 @@ namespace quickstep {
WindowAggregationOperationState::WindowAggregationOperationState(
const CatalogRelationSchema &input_relation,
+ std::vector<block_id> &&block_ids,
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -55,6 +56,7 @@ WindowAggregationOperationState::WindowAggregationOperationState(
const std::int64_t num_following,
StorageManager *storage_manager)
: input_relation_(input_relation),
+ block_ids_(std::move(block_ids)),
arguments_(std::move(arguments)),
is_row_(is_row),
num_preceding_(num_preceding),
@@ -82,7 +84,9 @@ WindowAggregationOperationState::WindowAggregationOperationState(
// Create the handle and initial state.
window_aggregation_handle_.reset(
- window_aggregate_function->createHandle(std::move(argument_types),
+ window_aggregate_function->createHandle(input_relation_,
+ block_ids_,
+ std::move(argument_types),
std::move(partition_by_types)));
#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
@@ -109,6 +113,11 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
StorageManager *storage_manager) {
DCHECK(ProtoIsValid(proto, database));
+ std::vector<block_id> block_ids;
+ for (int block_idx = 0; block_idx < proto.block_ids_size(); ++block_idx) {
+ block_ids.push_back(proto.block_ids(block_idx));
+ }
+
// Rebuild contructor arguments from their representation in 'proto'.
const WindowAggregateFunction *window_aggregate_function
= &WindowAggregateFunctionFactory::ReconstructFromProto(proto.function());
@@ -135,6 +144,7 @@ WindowAggregationOperationState* WindowAggregationOperationState::ReconstructFro
const std::int64_t num_following = proto.num_following();
return new WindowAggregationOperationState(database.getRelationSchemaById(proto.input_relation_id()),
+ std::move(block_ids),
window_aggregate_function,
std::move(arguments),
std::move(partition_by_attributes),
@@ -185,17 +195,18 @@ bool WindowAggregationOperationState::ProtoIsValid(const serialization::WindowAg
}
void WindowAggregationOperationState::windowAggregateBlocks(
- InsertDestination *output_destination,
- const std::vector<block_id> &block_ids) {
+ InsertDestination *output_destination) {
window_aggregation_handle_->calculate(arguments_,
- block_ids,
partition_by_ids_,
- input_relation_,
is_row_,
num_preceding_,
num_following_,
- storage_manager_,
- output_destination);
+ storage_manager_);
+ std::vector<ValueAccessor*> output_accessors(
+ window_aggregation_handle_->finalize(storage_manager_));
+ for (ValueAccessor* output_accessor : output_accessors) {
+ output_destination->bulkInsertTuples(output_accessor);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.hpp
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.hpp b/storage/WindowAggregationOperationState.hpp
index f38dbd8..8116410 100644
--- a/storage/WindowAggregationOperationState.hpp
+++ b/storage/WindowAggregationOperationState.hpp
@@ -67,6 +67,7 @@ class WindowAggregationOperationState {
* tables.
*/
WindowAggregationOperationState(const CatalogRelationSchema &input_relation,
+ std::vector<block_id> &&block_ids,
const WindowAggregateFunction *window_aggregate_function,
std::vector<std::unique_ptr<const Scalar>> &&arguments,
std::vector<std::unique_ptr<const Scalar>> &&partition_by_attributes,
@@ -112,11 +113,11 @@ class WindowAggregationOperationState {
* @param output_destination The output destination for the computed window
* aggregate.
**/
- void windowAggregateBlocks(InsertDestination *output_destination,
- const std::vector<block_id> &block_ids);
+ void windowAggregateBlocks(InsertDestination *output_destination);
private:
const CatalogRelationSchema &input_relation_;
+ const std::vector<block_id> block_ids_;
// TODO(Shixuan): Handle and State for window aggregation will be needed for
// actual calculation.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5f535ee/storage/WindowAggregationOperationState.proto
----------------------------------------------------------------------
diff --git a/storage/WindowAggregationOperationState.proto b/storage/WindowAggregationOperationState.proto
index 4dc0a6a..c3b672c 100644
--- a/storage/WindowAggregationOperationState.proto
+++ b/storage/WindowAggregationOperationState.proto
@@ -24,6 +24,7 @@ import "expressions/Expressions.proto";
message WindowAggregationOperationState {
required int32 input_relation_id = 1;
+ repeated fixed64 block_ids = 2;
required WindowAggregateFunction function = 3;
repeated Scalar arguments = 4;
repeated Scalar partition_by_attributes = 5;
[2/3] incubator-quickstep git commit: Added handle for avg
Posted by sh...@apache.org.
Added handle for avg
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/29664050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/29664050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/29664050
Branch: refs/heads/SQL-window-aggregation
Commit: 2966405066b223d0079f8df3e231ed56750259ad
Parents: 7671a58
Author: shixuan-fan <sh...@apache.org>
Authored: Fri Jul 8 18:23:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Fri Jul 8 18:23:47 2016 +0000
----------------------------------------------------------------------
expressions/CMakeLists.txt | 1 +
expressions/window_aggregation/CMakeLists.txt | 85 +++
.../WindowAggregateFunction.cpp | 46 ++
.../WindowAggregateFunction.hpp | 149 ++++++
.../WindowAggregateFunction.proto | 25 +
.../WindowAggregateFunctionAvg.cpp | 85 +++
.../WindowAggregateFunctionAvg.hpp | 75 +++
.../WindowAggregateFunctionFactory.cpp | 78 +++
.../WindowAggregateFunctionFactory.hpp | 102 ++++
.../WindowAggregationHandle.hpp | 130 +++++
.../WindowAggregationHandleAvg.cpp | 305 +++++++++++
.../WindowAggregationHandleAvg.hpp | 113 ++++
.../window_aggregation/WindowAggregationID.hpp | 44 ++
.../WindowAggregationHandleAvg_unittest.cpp | 526 +++++++++++++++++++
query_optimizer/CMakeLists.txt | 2 +
query_optimizer/ExecutionGenerator.cpp | 5 +-
query_optimizer/expressions/CMakeLists.txt | 2 +-
.../expressions/WindowAggregateFunction.cpp | 4 +-
.../expressions/WindowAggregateFunction.hpp | 10 +-
query_optimizer/resolver/CMakeLists.txt | 2 +
query_optimizer/resolver/Resolver.cpp | 40 +-
query_optimizer/resolver/Resolver.hpp | 5 +-
relational_operators/CMakeLists.txt | 1 +
.../WindowAggregationOperator.cpp | 10 +-
.../WindowAggregationOperator.hpp | 29 +-
relational_operators/WorkOrder.proto | 3 +-
storage/CMakeLists.txt | 11 +-
storage/WindowAggregationOperationState.cpp | 54 +-
storage/WindowAggregationOperationState.hpp | 53 +-
storage/WindowAggregationOperationState.proto | 16 +-
...WindowAggregationOperationState_unittest.cpp | 14 +-
31 files changed, 1897 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index 53ad5d4..6ef3c24 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -17,6 +17,7 @@ add_subdirectory(aggregation)
add_subdirectory(predicate)
add_subdirectory(scalar)
add_subdirectory(table_generator)
+add_subdirectory(window_aggregation)
QS_PROTOBUF_GENERATE_CPP(expressions_Expressions_proto_srcs
expressions_Expressions_proto_hdrs
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
new file mode 100644
index 0000000..fa4eae9
--- /dev/null
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -0,0 +1,85 @@
+# Copyright 2011-2015 Quickstep Technologies LLC.
+# Copyright 2015 Pivotal Software, Inc.
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# University of Wisconsin\u2014Madison.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+QS_PROTOBUF_GENERATE_CPP(expressions_windowaggregation_WindowAggregateFunction_proto_srcs
+ expressions_windowaggregation_WindowAggregateFunction_proto_hdrs
+ WindowAggregateFunction.proto)
+
+# Declare micro-libs:
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction
+ WindowAggregateFunction.cpp
+ WindowAggregateFunction.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ ${expressions_windowaggregation_WindowAggregateFunction_proto_srcs})
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ WindowAggregateFunctionAvg.cpp
+ WindowAggregateFunctionAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ WindowAggregateFunctionFactory.cpp
+ WindowAggregateFunctionFactory.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
+ ../../empty_src.cpp
+ WindowAggregationHandle.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ WindowAggregationHandleAvg.cpp
+ WindowAggregationHandleAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationID
+ ../../empty_src.cpp
+ WindowAggregationID.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandle
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_storage_HashTableBase
+ quickstep_types_TypedValue
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVector
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_utility_Macros)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.cpp b/expressions/window_aggregation/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..88ba0b9
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.cpp
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+serialization::WindowAggregateFunction WindowAggregateFunction::getProto() const {
+ serialization::WindowAggregateFunction proto;
+ switch (win_agg_id_) {
+ case WindowAggregationID::kAvg:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
+ break;
+ default: {
+ LOG(FATAL) << "Unrecognized WindowAggregationID: "
+ << static_cast<std::underlying_type<WindowAggregationID>::type>(win_agg_id_);
+ }
+ }
+
+ return proto;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..9cc5d74
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregationHandle;
+class Type;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief A class representing a particular window aggregate function in the
+ * abstract sense. Each named aggregate function is represented by a
+ * singleton subclass of WindowAggregateFunction.
+ *
+ * WindowAggregateFunction provides informational methods about the
+ * applicability of a particular window aggregate function to particular
+ * argument Type(s). The actual implementation of the window aggregate
+ * functions' logic is in the WindowAggregationHandle class hierarchy, and can
+ * be different depending on the particular argument Type(s) given to the window
+ * aggregate. To perform a window aggregation, a caller should first call
+ * WindowAggregateFunction::createHandle() to instantiate an
+ * WindowAggregationHandle object, then use the methods of
+ * WindowAggregationHandle to do the actual window aggregation. Finally, delete
+ * the WindowAggregationHandle after finished.
+ *
+ * See WindowAggregationHandle for more detailed information about how
+ * window aggregates are actually computed.
+ **/
+class WindowAggregateFunction {
+ public:
+ /**
+ * @brief Get the ID of this window aggregate (i.e. its unique ID amongst all
+ * the WindowAggregateFunctions).
+ *
+ * @return The WindowAggregationID of this WindowAggregateFunction.
+ **/
+ inline WindowAggregationID getWindowAggregationID() const {
+ return win_agg_id_;
+ }
+
+ /**
+ * @brief Get the human-readable name of this WindowAggregateFunction.
+ *
+ * @return The human-readable name of this WindowAggregateFunction.
+ **/
+ virtual std::string getName() const = 0;
+
+ /**
+ * @brief Get the serialized protocol buffer representation of this
+ * WindowAggregateFunction.
+ *
+ * @return A serialized protocol buffer representation of this
+ * WindowAggregateFunction.
+ **/
+ virtual serialization::WindowAggregateFunction getProto() const;
+
+ /**
+ * @brief Determine if this WindowAggregateFunction can be applied to
+ * arguments of particular Type(s).
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @return Whether this WindowAggregateFunction is applicable to the given
+ * argument_types.
+ **/
+ virtual bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const = 0;
+
+ /**
+ * @brief Determine the result Type for this WindowAggregateFunction given
+ * arguments of particular Type(s).
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @return The result Type for this WindowAggregateFunction applied to the
+ * specified argument_types, or nullptr if this
+ * WindowAggregateFunction is not applicable to the specified Type(s).
+ **/
+ virtual const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const = 0;
+
+ /**
+ * @brief Create a WindowAggregationHandle to compute aggregates.
+ *
+ * @warning It is an error to call this method for argument_types which this
+ * WindowAggregateFunction can not apply to. For safety, check
+ * canApplyToTypes() first.
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @param partition_by_attributes The partition keys.
+ * @param is_row Ture if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ *
+ * @return A new WindowAggregationHandle that can be used to compute this
+ * WindowAggregateFunction over the specified argument_types. Caller
+ * is responsible for deleting the returned object.
+ **/
+ virtual WindowAggregationHandle* createHandle(
+ std::vector<const Type*> &&argument_types,
+ std::vector<const Type*> &&partition_key_types) const = 0;
+
+ protected:
+ explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
+ : win_agg_id_(win_agg_id) {
+ }
+
+ private:
+ const WindowAggregationID win_agg_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.proto
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.proto b/expressions/window_aggregation/WindowAggregateFunction.proto
new file mode 100644
index 0000000..173ff0e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.proto
@@ -0,0 +1,25 @@
+// Copyright 2015 Pivotal Software, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+message WindowAggregateFunction {
+ enum WindowAggregationID {
+ AVG = 0;
+ }
+
+ required WindowAggregationID window_aggregation_id = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
new file mode 100644
index 0000000..e9a4453
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionAvg::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // AVG is unary.
+ if (argument_types.size() != 1) {
+ return false;
+ }
+
+ // Argument must be addable and divisible.
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .canApplyToTypes(*argument_types.front(), *argument_types.front())
+ && BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .canApplyToTypes(*argument_types.front(), TypeFactory::GetType(kDouble));
+}
+
+const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ // The type used to sum values is nullable, and we automatically widen int to
+ // long and float to double to have more headroom when adding up many values.
+ const Type *sum_type = &(argument_types.front()->getNullableVersion());
+ switch (sum_type->getTypeID()) {
+ case kInt:
+ sum_type = &TypeFactory::GetType(kLong, true);
+ break;
+ case kFloat:
+ sum_type = &TypeFactory::GetType(kDouble, true);
+ break;
+ default:
+ break;
+ }
+
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(*sum_type, TypeFactory::GetType(kDouble));
+}
+
+WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+ std::vector<const Type*> &&argument_types,
+ std::vector<const Type*> &&partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
+ << " that AVG can not be applied to.";
+
+ return new WindowAggregationHandleAvg(*argument_types.front(),
+ std::move(partition_key_types));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
new file mode 100644
index 0000000..18e1022
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregationHandle;
+class Type;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL AVG() OVER term.
+ **/
+class WindowAggregateFunctionAvg : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionAvg& Instance() {
+ static WindowAggregateFunctionAvg instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "AVG";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ std::vector<const Type*> &&argument_types,
+ std::vector<const Type*> &&partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionAvg()
+ : WindowAggregateFunction(WindowAggregationID::kAvg) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionAvg);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
new file mode 100644
index 0000000..afd53ef
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+
+#include <string>
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::Get(
+ const WindowAggregationID agg_id) {
+ switch (agg_id) {
+ case WindowAggregationID::kAvg:
+ return WindowAggregateFunctionAvg::Instance();
+ default: {
+ LOG(FATAL) << "Unrecognized WindowAggregationID: "
+ << static_cast<std::underlying_type<WindowAggregationID>::type>(agg_id);
+ }
+ }
+}
+
+const WindowAggregateFunction* WindowAggregateFunctionFactory::GetByName(
+ const std::string &name) {
+ if (name == "avg") {
+ return &WindowAggregateFunctionAvg::Instance();
+ } else {
+ return nullptr;
+ }
+}
+
+bool WindowAggregateFunctionFactory::ProtoIsValid(
+ const serialization::WindowAggregateFunction &proto) {
+ return proto.IsInitialized()
+ && serialization::WindowAggregateFunction::WindowAggregationID_IsValid(proto.window_aggregation_id());
+}
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::ReconstructFromProto(
+ const serialization::WindowAggregateFunction &proto) {
+ DCHECK(ProtoIsValid(proto))
+ << "Attempted to reconstruct an WindowAggregateFunction from an invalid proto:\n"
+ << proto.DebugString();
+
+ switch (proto.window_aggregation_id()) {
+ case serialization::WindowAggregateFunction::AVG:
+ return WindowAggregateFunctionAvg::Instance();
+ default: {
+ LOG(FATAL) << "Unrecognized serialization::WindowAggregateFunction::WindowAggregationID: "
+ << proto.window_aggregation_id()
+ << "\nFull proto debug string:\n"
+ << proto.DebugString();
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
new file mode 100644
index 0000000..2254482
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+
+#include <string>
+
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregateFunction;
+namespace serialization { class WindowAggregateFunction; }
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief All-static factory class that provides access to the various concrete
+ * implementations of WindowAggregateFunction.
+ *
+ * WindowAggregateFunctionFactory allows client code to use any
+ * WindowAggregateFunction in Quickstep in a generic way without having to know
+ * about all the specific subclasses of WindowAggregateFunction. In particular,
+ * it is used to deserialize WindowAggregateFunctions used in
+ * WindowAggregationOperationState from their protobuf representations
+ * (originally created by the optimizer) when deserializing a QueryContext.
+ **/
+class WindowAggregateFunctionFactory {
+ public:
+ /**
+ * @brief Get a particular WindowAggregateFunction by its ID.
+ *
+ * @param agg_id The ID of the desired WindowAggregateFunction.
+ * @return A reference to the singleton instance of the
+ * WindowAggregateFunction specified by agg_id.
+ **/
+ static const WindowAggregateFunction& Get(const WindowAggregationID agg_id);
+
+ /**
+ * @brief Get a particular WindowAggregateFunction by its name in SQL syntax.
+ *
+ * @param name The name of the desired WindowAggregateFunction in lower case.
+ * @return A pointer to the WindowAggregateFunction specified by name, or NULL
+ * if name does not match any known WindowAggregateFunction.
+ **/
+ static const WindowAggregateFunction* GetByName(const std::string &name);
+
+ /**
+ * @brief Determine if a serialized protobuf representation of a
+ * WindowAggregateFunction is fully-formed and valid.
+ *
+ * @param proto A serialized protobuf representation of a
+ * WindowAggregateFunction to check for validity.
+ * @return Whether proto is fully-formed and valid.
+ **/
+ static bool ProtoIsValid(const serialization::WindowAggregateFunction &proto);
+
+ /**
+ * @brief Get the WindowAggregateFunction represented by a proto.
+ *
+ * @warning It is an error to call this method with an invalid proto.
+ * ProtoIsValid() should be called first to check.
+ *
+ * @param proto A serialized protobuf representation of a
+ * WindowAggregateFunction.
+ * @return The WindowAggregateFunction represented by proto.
+ **/
+ static const WindowAggregateFunction& ReconstructFromProto(
+ const serialization::WindowAggregateFunction &proto);
+
+ private:
+ // Class is all-static and can not be instantiated.
+ WindowAggregateFunctionFactory();
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionFactory);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
new file mode 100644
index 0000000..77b1e76
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class ColumnVector;
+class InsertDestinationInterface;
+class Scalar;
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregationHandle encapsulates logic for actually computing
+ * window aggregates with particular argument(s).
+ * @note See also WindowAggregateFunction, which represents a SQL aggregate
+ * function in the abstract sense.
+ *
+ * A WindowAggregationHandle is created by calling
+ * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
+ * provides methods that are used to actually compute the window aggregate,
+ * storing intermediate results in WindowAggregationState objects.
+ *
+ * The work flow for computing a window aggregate is:
+ * 1. Create an initial state by createInitialState().
+ * 2. One thread will handle all the computation, iterating from the first
+ * tuple to the last tuple. Note there will be two modes that could be
+ * used upon different situations:
+ * a. If the window aggregate is defined as accumulative, which are:
+ * i. Functions applied to whole partition, such as rank(), ntile()
+ * and dense_rank().
+ * ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
+ * AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
+ * FOLLOWING".
+ * Then, for functions except median, we could store some global
+ * values in the state without keeping all the tuple values around.
+ * b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
+ * 3 FOLLOWING", we have to store all the tuples in the state so that
+ * we could know which values should be dropped as the window slides.
+ * For each computed value, generate a tuple store in the column vector.
+ * 3. Insert the new column into the original relation and return.
+ *
+ * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
+ * parallelization is to calculate the partial result inside each block. Each
+ * block could visit the following blocks as long as the block's last partition
+ * is not finished. WindowAggregationOperationState will be used for handling
+ * the global state of the calculation.
+ **/
+
+class WindowAggregationHandle {
+ public:
+ /**
+ * @brief Destructor.
+ **/
+ virtual ~WindowAggregationHandle() {}
+
+ /**
+ * @brief Calculate the window aggregate result.
+ *
+ * @param state The start state of the calculation.
+ * @param attribute_accessor A pointer to the value accessor of attributes.
+ * @param argument_ids The attribute_id of arguments in attribute_accessor,
+ * NULL if not all arguments are attributes.
+ * @param argument_accessor A pointer to the value accessor of arguments,
+ * NULL if all arguments are attributes.
+ * @param output_destination The destination for output.
+ **/
+ virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<block_id> &block_ids,
+ const std::vector<attribute_id> &partition_by_ids,
+ const CatalogRelationSchema &relation,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager,
+ InsertDestinationInterface *output_destination) const = 0;
+
+ protected:
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_by_ids The attribute_id of partition keys in
+ * attribute_accessor.
+ * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ * @param storage_manager A pointer to the storage manager.
+ **/
+ WindowAggregationHandle() {}
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
new file mode 100644
index 0000000..62f5a88
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -0,0 +1,305 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "storage/InsertDestinationInterface.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+ const Type &type,
+ std::vector<const Type*> &&partition_key_types)
+ : argument_type_(type) {
+ // We sum Int as Long and Float as Double so that we have more headroom when
+ // adding many values.
+ TypeID type_id;
+ switch (type.getTypeID()) {
+ case kInt:
+ case kLong:
+ type_id = kLong;
+ break;
+ case kFloat:
+ case kDouble:
+ type_id = kDouble;
+ break;
+ default:
+ type_id = type.getTypeID();
+ break;
+ }
+
+ sum_type_ = &(TypeFactory::GetType(type_id));
+
+ // Result is nullable, because AVG() over 0 values (or all NULL values) is
+ // NULL.
+ result_type_
+ = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(*sum_type_, TypeFactory::GetType(kDouble))
+ ->getNullableVersion());
+
+ // Make operators to do arithmetic:
+ // Add operator for summing argument values.
+ fast_add_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+ // Divide operator for dividing sum by count to get final average.
+ divide_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
+ // Comparison operators for checking if two tuples belong to the same partition.
+ for (const Type *partition_key_type : partition_key_types) {
+ equal_comparators_.emplace_back(
+ ComparisonFactory::GetComparison(ComparisonID::kEqual)
+ .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+ }
+}
+
+void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<block_id> &block_ids,
+ const std::vector<attribute_id> &partition_by_ids,
+ const CatalogRelationSchema &relation,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager,
+ InsertDestinationInterface *output_destination) const {
+ DCHECK(arguments.size() == 1);
+ DCHECK(!block_ids.empty());
+
+ // Initialize the tuple accessors and argument accessors.
+ // Index of each value accessor indicates the block it belongs to.
+ std::vector<ValueAccessor*> tuple_accessors;
+ std::vector<ColumnVectorsValueAccessor*> argument_accessors;
+ for (block_id bid : block_ids) {
+ // Get tuple accessor.
+ BlockReference block = storage_manager->getBlock(bid, relation);
+ const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+ ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+ tuple_accessors.push_back(tuple_accessor);
+
+ // Get argument accessor.
+ ColumnVectorsValueAccessor argument_accessor;
+ SubBlocksReference sub_block_ref(tuple_block,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ argument_accessor.addColumn(
+ arguments.front()->getAllValues(tuple_accessor, &sub_block_ref));
+ argument_accessors.push_back(&argument_accessor);
+ }
+
+ // Create a window for each tuple and calculate the window aggregate.
+ for (std::uint32_t current_block_index = 0;
+ current_block_index < block_ids.size();
+ ++current_block_index) {
+ ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
+ ColumnVectorsValueAccessor* argument_accessor =
+ argument_accessors[current_block_index];
+
+ InvokeOnAnyValueAccessor (
+ tuple_accessor,
+ [&] (auto *tuple_accessor) -> void {
+ tuple_accessor->beginIteration();
+ argument_accessor->beginIteration();
+
+ while (tuple_accessor->next() && argument_accessor->next()) {
+ TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+ argument_accessors,
+ partition_by_ids,
+ current_block_index,
+ is_row,
+ num_preceding,
+ num_following);
+ Tuple *current_tuple = tuple_accessor->getTuple();
+ std::vector<TypedValue> new_tuple;
+ for (TypedValue value : *current_tuple) {
+ new_tuple.push_back(value);
+ }
+
+ new_tuple.push_back(window_aggregate);
+ output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+ }
+ });
+ }
+}
+
+TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+ std::vector<ValueAccessor*> &tuple_accessors,
+ std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
+ const std::vector<attribute_id> &partition_by_ids,
+ const std::uint32_t current_block_index,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
+ // Initialize.
+ ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
+ ColumnVectorsValueAccessor *argument_accessor = argument_accessors[current_block_index];
+ TypedValue sum = sum_type_->makeZeroValue();
+ sum = fast_add_operator_->
+ applyToTypedValues(sum, argument_accessor->getTypedValue(0));
+ std::uint64_t count = 1;
+
+ // Get the partition key for the current row.
+ std::vector<TypedValue> current_row_partition_key;
+ for (attribute_id partition_by_id : partition_by_ids) {
+ current_row_partition_key.push_back(
+ tuple_accessor->getTypedValueVirtual(partition_by_id));
+ }
+
+ // Get current position.
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
+
+ // Find preceding tuples.
+ int count_preceding = 0;
+ tuple_id preceding_tuple_id = current_tuple_id;
+ block_id preceding_block_index = current_block_index;
+ while (num_preceding == -1 || count_preceding < num_preceding) {
+ preceding_tuple_id--;
+
+ // If the preceding tuple locates in the previous block, move to the
+ // previous block and continue searching.
+ // TODO(Shixuan): If it is possible to have empty blocks, "if" has to be
+ // changed to "while".
+ if (preceding_tuple_id < 0) {
+ // First tuple of the first block, no more preceding blocks.
+ preceding_block_index--;
+ if (preceding_block_index < 0) {
+ break;
+ }
+
+ tuple_accessor = tuple_accessors[preceding_block_index];
+ argument_accessor = argument_accessors[preceding_block_index];
+ preceding_tuple_id = argument_accessor->getNumTuples() - 1;
+ }
+
+ // Get the partition keys and compare. If not the same partition as the
+ // current row, end searching preceding tuples.
+ if (!samePartition(current_row_partition_key,
+ tuple_accessor,
+ preceding_tuple_id,
+ partition_by_ids)) {
+ break;
+ }
+
+
+ // Actually count the element and do the calculation.
+ count_preceding++;
+ sum = fast_add_operator_->applyToTypedValues(
+ sum,
+ argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id));
+ }
+
+ count += count_preceding;
+
+ // Find following tuples.
+ int count_following = 0;
+ tuple_id following_tuple_id = current_tuple_id;
+ block_id following_block_index = current_block_index;
+ while (num_following == -1 || count_following < num_following) {
+ following_tuple_id++;
+
+ // If the following tuple locates in the next block, move to the next block
+ // and continue searching.
+ // TODO(Shixuan): If it is possible to have empty blocks, "if" has to be
+ // changed to "while".
+ if (following_tuple_id >= argument_accessor->getNumTuples()) {
+ following_block_index++;
+ // Last tuple of the last block, no more following blocks.
+ if (following_block_index == tuple_accessors.size()) {
+ break;
+ }
+
+ tuple_accessor = tuple_accessors[following_block_index];
+ argument_accessor = argument_accessors[following_block_index];
+ following_tuple_id = 0;
+ }
+
+ // Get the partition keys and compare. If not the same partition as the
+ // current row, end searching preceding tuples.
+ if (!samePartition(current_row_partition_key,
+ tuple_accessor,
+ following_tuple_id,
+ partition_by_ids)) {
+ break;
+ }
+
+
+ // Actually count the element and do the calculation.
+ count_following++;
+ sum = fast_add_operator_->applyToTypedValues(
+ sum,
+ argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id));
+ }
+
+ count += count_following;
+
+
+ return divide_operator_->applyToTypedValues(sum,
+ TypedValue(static_cast<double>(count)));
+}
+
+bool WindowAggregationHandleAvg::samePartition(
+ const std::vector<TypedValue> ¤t_row_partition_key,
+ ValueAccessor *tuple_accessor,
+ const tuple_id boundary_tuple_id,
+ const std::vector<attribute_id> &partition_by_ids) const {
+ return InvokeOnAnyValueAccessor (tuple_accessor,
+ [&] (auto *tuple_accessor) -> bool {
+ for (std::uint32_t partition_by_index = 0;
+ partition_by_index < partition_by_ids.size();
+ ++partition_by_index) {
+ if (!equal_comparators_[partition_by_index]->compareTypedValues(
+ current_row_partition_key[partition_by_index],
+ tuple_accessor->getTypedValueAtAbsolutePosition(
+ partition_by_ids[partition_by_index], boundary_tuple_id))) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
new file mode 100644
index 0000000..115152e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class ColumnVectorsValueAccessor;
+class InsertDestinationInterface;
+class StorageManager;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief A WindowAggregationHandle for average.
+ **/
+class WindowAggregationHandleAvg : public WindowAggregationHandle {
+ public:
+ ~WindowAggregationHandleAvg() override {}
+
+ void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<block_id> &block_ids,
+ const std::vector<attribute_id> &partition_by_ids,
+ const CatalogRelationSchema &relation,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager,
+ InsertDestinationInterface *output_destination) const;
+
+ private:
+ friend class WindowAggregateFunctionAvg;
+
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_by_ids The attribute_id of partition keys in
+ * attribute_accessor.
+ * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ * @param storage_manager A pointer to the storage manager.
+ * @param type Type of the avg value.
+ **/
+ explicit WindowAggregationHandleAvg(const Type &type,
+ std::vector<const Type*> &&partition_key_types);
+
+ TypedValue calculateOneWindow(
+ std::vector<ValueAccessor*> &tuple_accessors,
+ std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
+ const std::vector<attribute_id> &partition_by_ids,
+ const std::uint32_t current_block_index,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const;
+
+ bool samePartition(const std::vector<TypedValue> ¤t_row_partition_key,
+ ValueAccessor *tuple_accessor,
+ const tuple_id boundary_tuple_id,
+ const std::vector<attribute_id> &partition_by_ids) const;
+
+ const Type &argument_type_;
+ const Type *sum_type_;
+ const Type *result_type_;
+ std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+ std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+ std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationID.hpp b/expressions/window_aggregation/WindowAggregationID.hpp
new file mode 100644
index 0000000..74c948f
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationID.hpp
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief The possible types of window aggregations.
+ **/
+enum class WindowAggregationID {
+ kAvg,
+ kCount,
+ kMin,
+ kMax,
+ kSum
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..6a7d161
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,526 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageManager.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+ constexpr int kNumSamples = 100;
+
+} // namespace
+
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+ // Handle initialization.
+ void initializeHandle(const Type &argument_type,
+ const std::vector<const Type*> &partition_key_types) {
+ WindowAggregateFunction *function =
+ WindowAggregateFactory::Get(WindowAggregationID::kAvg);
+ handle_avg_.reset(function->createHandle(std::vector<const Type*>(1, &argument_type),
+ partition_key_types));
+ }
+
+ // Test canApplyToTypes().
+ static bool CanApplyToTypesTest(TypeID typeID) {
+ const Type &type = (typeID == kChar || typeID == kVarChar) ?
+ TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+ TypeFactory::GetType(typeID);
+
+ return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+ std::vector<const Type*>(1, &type));
+ }
+
+ // Test resultTypeForArgumentTypes().
+ static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+ TypeID output_type_id) {
+ const Type *result_type
+ = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+ std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+ return (result_type->getTypeID() == output_type_id);
+ }
+
+ template <typename CppType>
+ static void CheckAvgValue(
+ CppType expected,
+ const AggregationHandle &handle,
+ const AggregationState &state) {
+ EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+ }
+
+ // Static templated method for set a meaningful value to data types.
+ template <typename CppType>
+ static void SetDataType(int value, CppType *data) {
+ *data = value;
+ }
+
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkAggregationAvgGeneric() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+ EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+ typename GenericType::cpptype val;
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+
+ iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
+ for (int i = 0; i < kNumSamples; ++i) {
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, &val);
+ } else {
+ SetDataType(static_cast<float>(i - 10)/10, &val);
+ }
+ iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
+ sum += val;
+ }
+ iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
+ CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+
+ // Test mergeStates().
+ std::unique_ptr<AggregationState> merge_state(
+ aggregation_handle_avg_->createInitialState());
+ aggregation_handle_avg_->mergeStates(*merge_state,
+ aggregation_handle_avg_state_.get());
+
+ iterateHandle(merge_state.get(), type.makeNullValue());
+ for (int i = 0; i < kNumSamples; ++i) {
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, &val);
+ } else {
+ SetDataType(static_cast<float>(i - 10)/10, &val);
+ }
+ iterateHandle(merge_state.get(), type.makeValue(&val));
+ sum += val;
+ }
+
+ aggregation_handle_avg_->mergeStates(*merge_state,
+ aggregation_handle_avg_state_.get());
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / (2 * kNumSamples),
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+ }
+
+ template <typename GenericType>
+ ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
+ NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
+
+ typename GenericType::cpptype val;
+ SetDataType(0, sum);
+
+ column->appendTypedValue(type.makeNullValue());
+ for (int i = 0; i < kNumSamples; ++i) {
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, &val);
+ } else {
+ SetDataType(static_cast<float>(i - 10)/10, &val);
+ }
+ column->appendTypedValue(type.makeValue(&val));
+ *sum += val;
+ // One NULL in the middle.
+ if (i == kNumSamples/2) {
+ column->appendTypedValue(type.makeNullValue());
+ }
+ }
+ column->appendTypedValue(type.makeNullValue());
+
+ return column;
+ }
+
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkAggregationAvgGenericColumnVector() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+ EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ std::vector<std::unique_ptr<ColumnVector>> column_vectors;
+ column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
+
+ std::unique_ptr<AggregationState> cv_state(
+ aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
+
+ // Test the state generated directly by accumulateColumnVectors(), and also
+ // test after merging back.
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *cv_state);
+
+ aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+ }
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkAggregationAvgGenericValueAccessor() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+ EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
+ accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
+
+ std::unique_ptr<AggregationState> va_state(
+ aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
+ std::vector<attribute_id>(1, 0)));
+
+ // Test the state generated directly by accumulateValueAccessor(), and also
+ // test after merging back.
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *va_state);
+
+ aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+ }
+#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+
+ std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
+ std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
+};
+
+const int AggregationHandleAvgTest::kNumSamples;
+
+template <>
+void AggregationHandleAvgTest::CheckAvgValue<double>(
+ double expected,
+ const AggregationHandle &handle,
+ const AggregationState &state) {
+ EXPECT_DOUBLE_EQ(expected, handle.finalize(state).getLiteral<double>());
+}
+
+template <>
+void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
+ data->interval_ticks = value;
+}
+
+template <>
+void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
+ data->months = value;
+}
+
+typedef AggregationHandleAvgTest AggregationHandleAvgDeathTest;
+
+TEST_F(AggregationHandleAvgTest, IntTypeTest) {
+ checkAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeTest) {
+ checkAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeTest) {
+ checkAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeTest) {
+ checkAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+ checkAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+ checkAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
+}
+#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) {
+ const Type &type = CharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(AggregationHandleAvgDeathTest, VarTypeTest) {
+ const Type &type = VarCharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
+ const Type &int_non_null_type = IntType::Instance(false);
+ const Type &long_type = LongType::Instance(true);
+ const Type &double_type = DoubleType::Instance(true);
+ const Type &float_type = FloatType::Instance(true);
+ const Type &char_type = CharType::Instance(true, 10);
+ const Type &varchar_type = VarCharType::Instance(true, 10);
+
+ initializeHandle(IntType::Instance(true));
+ int int_val = 0;
+ std::int64_t long_val = 0;
+ double double_val = 0;
+ float float_val = 0;
+
+ iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
+
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
+
+ // Test mergeStates() with incorrectly typed handles.
+ std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
+ AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
+ std::vector<const Type*>(1, &double_type)));
+ std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
+ aggregation_handle_avg_double->createInitialState());
+ static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
+ static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
+ double_type.makeValue(&double_val));
+ EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
+ aggregation_handle_avg_state_.get()),
+ "");
+
+ std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
+ AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
+ std::vector<const Type*>(1, &float_type)));
+ std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
+ aggregation_handle_avg_float->createInitialState());
+ static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
+ static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
+ float_type.makeValue(&float_val));
+ EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
+ aggregation_handle_avg_state_.get()),
+ "");
+}
+#endif
+
+TEST_F(AggregationHandleAvgTest, canApplyToTypeTest) {
+ EXPECT_TRUE(ApplyToTypesTest(kInt));
+ EXPECT_TRUE(ApplyToTypesTest(kLong));
+ EXPECT_TRUE(ApplyToTypesTest(kFloat));
+ EXPECT_TRUE(ApplyToTypesTest(kDouble));
+ EXPECT_FALSE(ApplyToTypesTest(kChar));
+ EXPECT_FALSE(ApplyToTypesTest(kVarChar));
+ EXPECT_FALSE(ApplyToTypesTest(kDatetime));
+ EXPECT_TRUE(ApplyToTypesTest(kDatetimeInterval));
+ EXPECT_TRUE(ApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kInt, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
+ const Type &long_non_null_type = LongType::Instance(false);
+ initializeHandle(long_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_avg_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_avg_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_avg_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ source_hash_table.get());
+
+ AggregationHandleAvg *aggregation_handle_avg_derived =
+ static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(static_cast<std::int64_t>(0));
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+ exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+ const std::int64_t common_key_source_avg = 355;
+ TypedValue common_key_source_avg_val(common_key_source_avg);
+
+ const std::int64_t common_key_destination_avg = 295;
+ TypedValue common_key_destination_avg_val(common_key_destination_avg);
+
+ const std::int64_t exclusive_key_source_avg = 1;
+ TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
+
+ const std::int64_t exclusive_key_destination_avg = 1;
+ TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
+
+ std::unique_ptr<AggregationStateAvg> common_key_source_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> common_key_destination_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+
+ // Create avg value states for keys.
+ aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_avg_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckAvgValue<double>(
+ (common_key_destination_avg_val.getLiteral<std::int64_t>() +
+ common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
+ *aggregation_handle_avg_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_avg_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_avg_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_ExecutionHeuristics
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
// Get input.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
- window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new WindowAggregationOperator(query_handle_->query_id(),
+ *input_relation_info->relation,
*output_relation,
window_aggr_state_index,
insert_destination_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
glog
- quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_Expression
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
#include <utility>
#include <vector>
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/Expression.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
}
WindowAggregateFunctionPtr WindowAggregateFunction::Create(
- const ::quickstep::AggregateFunction &window_aggregate,
+ const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..279dfa1 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,7 +33,7 @@
namespace quickstep {
-class AggregateFunction;
+class WindowAggregateFunction;
class Type;
namespace optimizer {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
* @return The WindowAggregateFunction singleton (from the expression system)
* for this node.
**/
- inline const ::quickstep::AggregateFunction& window_aggregate() const {
+ inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
return window_aggregate_;
}
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
* @param is_distinct Whether this is a DISTINCT aggregation.
* @return A new AggregateFunctionPtr.
**/
- static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+ static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
* @param window_info The window info of the window aggregate function.
* @param is_distinct Indicates whether this is a DISTINCT aggregation.
*/
- WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+ WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
// window_aggregate_. If it really needs to be seperated from the
// AggregationFunction, a new class for WindowAggregationFunction should be
// created as quickstep::WindowAggregateFunction.
- const ::quickstep::AggregateFunction &window_aggregate_;
+ const ::quickstep::WindowAggregateFunction &window_aggregate_;
std::vector<ScalarPtr> arguments_;
const WindowInfo window_info_;
const std::string window_name_;