You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by sh...@apache.org on 2016/07/08 18:22:10 UTC
[2/2] incubator-quickstep git commit: Added handle for avg
Added handle for avg
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/29664050
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/29664050
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/29664050
Branch: refs/heads/SQL-window-aggregation
Commit: 2966405066b223d0079f8df3e231ed56750259ad
Parents: 7671a58
Author: shixuan-fan <sh...@apache.org>
Authored: Fri Jul 8 18:23:47 2016 +0000
Committer: shixuan-fan <sh...@apache.org>
Committed: Fri Jul 8 18:23:47 2016 +0000
----------------------------------------------------------------------
expressions/CMakeLists.txt | 1 +
expressions/window_aggregation/CMakeLists.txt | 85 +++
.../WindowAggregateFunction.cpp | 46 ++
.../WindowAggregateFunction.hpp | 149 ++++++
.../WindowAggregateFunction.proto | 25 +
.../WindowAggregateFunctionAvg.cpp | 85 +++
.../WindowAggregateFunctionAvg.hpp | 75 +++
.../WindowAggregateFunctionFactory.cpp | 78 +++
.../WindowAggregateFunctionFactory.hpp | 102 ++++
.../WindowAggregationHandle.hpp | 130 +++++
.../WindowAggregationHandleAvg.cpp | 305 +++++++++++
.../WindowAggregationHandleAvg.hpp | 113 ++++
.../window_aggregation/WindowAggregationID.hpp | 44 ++
.../WindowAggregationHandleAvg_unittest.cpp | 526 +++++++++++++++++++
query_optimizer/CMakeLists.txt | 2 +
query_optimizer/ExecutionGenerator.cpp | 5 +-
query_optimizer/expressions/CMakeLists.txt | 2 +-
.../expressions/WindowAggregateFunction.cpp | 4 +-
.../expressions/WindowAggregateFunction.hpp | 10 +-
query_optimizer/resolver/CMakeLists.txt | 2 +
query_optimizer/resolver/Resolver.cpp | 40 +-
query_optimizer/resolver/Resolver.hpp | 5 +-
relational_operators/CMakeLists.txt | 1 +
.../WindowAggregationOperator.cpp | 10 +-
.../WindowAggregationOperator.hpp | 29 +-
relational_operators/WorkOrder.proto | 3 +-
storage/CMakeLists.txt | 11 +-
storage/WindowAggregationOperationState.cpp | 54 +-
storage/WindowAggregationOperationState.hpp | 53 +-
storage/WindowAggregationOperationState.proto | 16 +-
...WindowAggregationOperationState_unittest.cpp | 14 +-
31 files changed, 1897 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index 53ad5d4..6ef3c24 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -17,6 +17,7 @@ add_subdirectory(aggregation)
add_subdirectory(predicate)
add_subdirectory(scalar)
add_subdirectory(table_generator)
+add_subdirectory(window_aggregation)
QS_PROTOBUF_GENERATE_CPP(expressions_Expressions_proto_srcs
expressions_Expressions_proto_hdrs
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/CMakeLists.txt b/expressions/window_aggregation/CMakeLists.txt
new file mode 100644
index 0000000..fa4eae9
--- /dev/null
+++ b/expressions/window_aggregation/CMakeLists.txt
@@ -0,0 +1,85 @@
+# Copyright 2011-2015 Quickstep Technologies LLC.
+# Copyright 2015 Pivotal Software, Inc.
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# University of Wisconsin\u2014Madison.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+QS_PROTOBUF_GENERATE_CPP(expressions_windowaggregation_WindowAggregateFunction_proto_srcs
+ expressions_windowaggregation_WindowAggregateFunction_proto_hdrs
+ WindowAggregateFunction.proto)
+
+# Declare micro-libs:
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction
+ WindowAggregateFunction.cpp
+ WindowAggregateFunction.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ ${expressions_windowaggregation_WindowAggregateFunction_proto_srcs})
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ WindowAggregateFunctionAvg.cpp
+ WindowAggregateFunctionAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ WindowAggregateFunctionFactory.cpp
+ WindowAggregateFunctionFactory.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandle
+ ../../empty_src.cpp
+ WindowAggregationHandle.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ WindowAggregationHandleAvg.cpp
+ WindowAggregationHandleAvg.hpp)
+add_library(quickstep_expressions_windowaggregation_WindowAggregationID
+ ../../empty_src.cpp
+ WindowAggregationID.hpp)
+
+# Link dependencies:
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction
+ glog
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandle
+ glog
+ quickstep_catalog_CatalogTypedefs
+ quickstep_storage_HashTableBase
+ quickstep_types_TypedValue
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionFactory
+ quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregateFunctionAvg
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_expressions_windowaggregation_WindowAggregationID
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_windowaggregation_WindowAggregationHandleAvg
+ quickstep_catalog_CatalogTypedefs
+ quickstep_expressions_windowaggregation_WindowAggregationHandle
+ quickstep_types_Type
+ quickstep_types_TypeFactory
+ quickstep_types_TypeID
+ quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVector
+ quickstep_types_operations_binaryoperations_BinaryOperation
+ quickstep_types_operations_binaryoperations_BinaryOperationFactory
+ quickstep_types_operations_binaryoperations_BinaryOperationID
+ quickstep_utility_Macros)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.cpp b/expressions/window_aggregation/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..88ba0b9
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.cpp
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+serialization::WindowAggregateFunction WindowAggregateFunction::getProto() const {
+ serialization::WindowAggregateFunction proto;
+ switch (win_agg_id_) {
+ case WindowAggregationID::kAvg:
+ proto.set_window_aggregation_id(serialization::WindowAggregateFunction::AVG);
+ break;
+ default: {
+ LOG(FATAL) << "Unrecognized WindowAggregationID: "
+ << static_cast<std::underlying_type<WindowAggregationID>::type>(win_agg_id_);
+ }
+ }
+
+ return proto;
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.hpp b/expressions/window_aggregation/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..9cc5d74
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.hpp
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregationHandle;
+class Type;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief A class representing a particular window aggregate function in the
+ * abstract sense. Each named aggregate function is represented by a
+ * singleton subclass of WindowAggregateFunction.
+ *
+ * WindowAggregateFunction provides informational methods about the
+ * applicability of a particular window aggregate function to particular
+ * argument Type(s). The actual implementation of the window aggregate
+ * functions' logic is in the WindowAggregationHandle class hierarchy, and can
+ * be different depending on the particular argument Type(s) given to the window
+ * aggregate. To perform a window aggregation, a caller should first call
+ * WindowAggregateFunction::createHandle() to instantiate an
+ * WindowAggregationHandle object, then use the methods of
+ * WindowAggregationHandle to do the actual window aggregation. Finally, delete
+ * the WindowAggregationHandle after finished.
+ *
+ * See WindowAggregationHandle for more detailed information about how
+ * window aggregates are actually computed.
+ **/
+class WindowAggregateFunction {
+ public:
+ /**
+ * @brief Get the ID of this window aggregate (i.e. its unique ID amongst all
+ * the WindowAggregateFunctions).
+ *
+ * @return The WindowAggregationID of this WindowAggregateFunction.
+ **/
+ inline WindowAggregationID getWindowAggregationID() const {
+ return win_agg_id_;
+ }
+
+ /**
+ * @brief Get the human-readable name of this WindowAggregateFunction.
+ *
+ * @return The human-readable name of this WindowAggregateFunction.
+ **/
+ virtual std::string getName() const = 0;
+
+ /**
+ * @brief Get the serialized protocol buffer representation of this
+ * WindowAggregateFunction.
+ *
+ * @return A serialized protocol buffer representation of this
+ * WindowAggregateFunction.
+ **/
+ virtual serialization::WindowAggregateFunction getProto() const;
+
+ /**
+ * @brief Determine if this WindowAggregateFunction can be applied to
+ * arguments of particular Type(s).
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @return Whether this WindowAggregateFunction is applicable to the given
+ * argument_types.
+ **/
+ virtual bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const = 0;
+
+ /**
+ * @brief Determine the result Type for this WindowAggregateFunction given
+ * arguments of particular Type(s).
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @return The result Type for this WindowAggregateFunction applied to the
+ * specified argument_types, or nullptr if this
+ * WindowAggregateFunction is not applicable to the specified Type(s).
+ **/
+ virtual const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const = 0;
+
+ /**
+ * @brief Create a WindowAggregationHandle to compute aggregates.
+ *
+ * @warning It is an error to call this method for argument_types which this
+ * WindowAggregateFunction can not apply to. For safety, check
+ * canApplyToTypes() first.
+ *
+ * @param argument_types A list of zero or more Types (in order) for
+ * arguments to this WindowAggregateFunction.
+ * @param partition_by_attributes The partition keys.
+ * @param is_row Ture if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ *
+ * @return A new WindowAggregationHandle that can be used to compute this
+ * WindowAggregateFunction over the specified argument_types. Caller
+ * is responsible for deleting the returned object.
+ **/
+ virtual WindowAggregationHandle* createHandle(
+ std::vector<const Type*> &&argument_types,
+ std::vector<const Type*> &&partition_key_types) const = 0;
+
+ protected:
+ explicit WindowAggregateFunction(const WindowAggregationID win_agg_id)
+ : win_agg_id_(win_agg_id) {
+ }
+
+ private:
+ const WindowAggregationID win_agg_id_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunction.proto
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunction.proto b/expressions/window_aggregation/WindowAggregateFunction.proto
new file mode 100644
index 0000000..173ff0e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunction.proto
@@ -0,0 +1,25 @@
+// Copyright 2015 Pivotal Software, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+message WindowAggregateFunction {
+ enum WindowAggregationID {
+ AVG = 0;
+ }
+
+ required WindowAggregationID window_aggregation_id = 1;
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
new file mode 100644
index 0000000..e9a4453
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.cpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+bool WindowAggregateFunctionAvg::canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const {
+ // AVG is unary.
+ if (argument_types.size() != 1) {
+ return false;
+ }
+
+ // Argument must be addable and divisible.
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .canApplyToTypes(*argument_types.front(), *argument_types.front())
+ && BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .canApplyToTypes(*argument_types.front(), TypeFactory::GetType(kDouble));
+}
+
+const Type* WindowAggregateFunctionAvg::resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const {
+ if (!canApplyToTypes(argument_types)) {
+ return nullptr;
+ }
+
+ // The type used to sum values is nullable, and we automatically widen int to
+ // long and float to double to have more headroom when adding up many values.
+ const Type *sum_type = &(argument_types.front()->getNullableVersion());
+ switch (sum_type->getTypeID()) {
+ case kInt:
+ sum_type = &TypeFactory::GetType(kLong, true);
+ break;
+ case kFloat:
+ sum_type = &TypeFactory::GetType(kDouble, true);
+ break;
+ default:
+ break;
+ }
+
+ return BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(*sum_type, TypeFactory::GetType(kDouble));
+}
+
+WindowAggregationHandle* WindowAggregateFunctionAvg::createHandle(
+ std::vector<const Type*> &&argument_types,
+ std::vector<const Type*> &&partition_key_types) const {
+ DCHECK(canApplyToTypes(argument_types))
+ << "Attempted to create an WindowAggregationHandleAvg for argument Type(s)"
+ << " that AVG can not be applied to.";
+
+ return new WindowAggregationHandleAvg(*argument_types.front(),
+ std::move(partition_key_types));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
new file mode 100644
index 0000000..18e1022
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionAvg.hpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
+
+#include <string>
+#include <vector>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregationHandle;
+class Type;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregateFunction representing SQL AVG() OVER term.
+ **/
+class WindowAggregateFunctionAvg : public WindowAggregateFunction {
+ public:
+ static const WindowAggregateFunctionAvg& Instance() {
+ static WindowAggregateFunctionAvg instance;
+ return instance;
+ }
+
+ std::string getName() const override {
+ return "AVG";
+ }
+
+ bool canApplyToTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ const Type* resultTypeForArgumentTypes(
+ const std::vector<const Type*> &argument_types) const override;
+
+ WindowAggregationHandle* createHandle(
+ std::vector<const Type*> &&argument_types,
+ std::vector<const Type*> &&partition_key_types) const override;
+
+ private:
+ WindowAggregateFunctionAvg()
+ : WindowAggregateFunction(WindowAggregationID::kAvg) {
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionAvg);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_AVG_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
new file mode 100644
index 0000000..afd53ef
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.cpp
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+
+#include <string>
+#include <type_traits>
+
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
+#include "expressions/window_aggregation/WindowAggregateFunctionAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::Get(
+ const WindowAggregationID agg_id) {
+ switch (agg_id) {
+ case WindowAggregationID::kAvg:
+ return WindowAggregateFunctionAvg::Instance();
+ default: {
+ LOG(FATAL) << "Unrecognized WindowAggregationID: "
+ << static_cast<std::underlying_type<WindowAggregationID>::type>(agg_id);
+ }
+ }
+}
+
+const WindowAggregateFunction* WindowAggregateFunctionFactory::GetByName(
+ const std::string &name) {
+ if (name == "avg") {
+ return &WindowAggregateFunctionAvg::Instance();
+ } else {
+ return nullptr;
+ }
+}
+
+bool WindowAggregateFunctionFactory::ProtoIsValid(
+ const serialization::WindowAggregateFunction &proto) {
+ return proto.IsInitialized()
+ && serialization::WindowAggregateFunction::WindowAggregationID_IsValid(proto.window_aggregation_id());
+}
+
+const WindowAggregateFunction& WindowAggregateFunctionFactory::ReconstructFromProto(
+ const serialization::WindowAggregateFunction &proto) {
+ DCHECK(ProtoIsValid(proto))
+ << "Attempted to reconstruct an WindowAggregateFunction from an invalid proto:\n"
+ << proto.DebugString();
+
+ switch (proto.window_aggregation_id()) {
+ case serialization::WindowAggregateFunction::AVG:
+ return WindowAggregateFunctionAvg::Instance();
+ default: {
+ LOG(FATAL) << "Unrecognized serialization::WindowAggregateFunction::WindowAggregationID: "
+ << proto.window_aggregation_id()
+ << "\nFull proto debug string:\n"
+ << proto.DebugString();
+ }
+ }
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
new file mode 100644
index 0000000..2254482
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregateFunctionFactory.hpp
@@ -0,0 +1,102 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
+
+#include <string>
+
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class WindowAggregateFunction;
+namespace serialization { class WindowAggregateFunction; }
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief All-static factory class that provides access to the various concrete
+ * implementations of WindowAggregateFunction.
+ *
+ * WindowAggregateFunctionFactory allows client code to use any
+ * WindowAggregateFunction in Quickstep in a generic way without having to know
+ * about all the specific subclasses of WindowAggregateFunction. In particular,
+ * it is used to deserialize WindowAggregateFunctions used in
+ * WindowAggregationOperationState from their protobuf representations
+ * (originally created by the optimizer) when deserializing a QueryContext.
+ **/
+class WindowAggregateFunctionFactory {
+ public:
+ /**
+ * @brief Get a particular WindowAggregateFunction by its ID.
+ *
+ * @param agg_id The ID of the desired WindowAggregateFunction.
+ * @return A reference to the singleton instance of the
+ * WindowAggregateFunction specified by agg_id.
+ **/
+ static const WindowAggregateFunction& Get(const WindowAggregationID agg_id);
+
+ /**
+ * @brief Get a particular WindowAggregateFunction by its name in SQL syntax.
+ *
+ * @param name The name of the desired WindowAggregateFunction in lower case.
+ * @return A pointer to the WindowAggregateFunction specified by name, or NULL
+ * if name does not match any known WindowAggregateFunction.
+ **/
+ static const WindowAggregateFunction* GetByName(const std::string &name);
+
+ /**
+ * @brief Determine if a serialized protobuf representation of a
+ * WindowAggregateFunction is fully-formed and valid.
+ *
+ * @param proto A serialized protobuf representation of a
+ * WindowAggregateFunction to check for validity.
+ * @return Whether proto is fully-formed and valid.
+ **/
+ static bool ProtoIsValid(const serialization::WindowAggregateFunction &proto);
+
+ /**
+ * @brief Get the WindowAggregateFunction represented by a proto.
+ *
+ * @warning It is an error to call this method with an invalid proto.
+ * ProtoIsValid() should be called first to check.
+ *
+ * @param proto A serialized protobuf representation of a
+ * WindowAggregateFunction.
+ * @return The WindowAggregateFunction represented by proto.
+ **/
+ static const WindowAggregateFunction& ReconstructFromProto(
+ const serialization::WindowAggregateFunction &proto);
+
+ private:
+ // Class is all-static and can not be instantiated.
+ WindowAggregateFunctionFactory();
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunctionFactory);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATE_FUNCTION_FACTORY_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandle.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandle.hpp b/expressions/window_aggregation/WindowAggregationHandle.hpp
new file mode 100644
index 0000000..77b1e76
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandle.hpp
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class ColumnVector;
+class InsertDestinationInterface;
+class Scalar;
+class StorageManager;
+class Type;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief WindowAggregationHandle encapsulates logic for actually computing
+ * window aggregates with particular argument(s).
+ * @note See also WindowAggregateFunction, which represents a SQL aggregate
+ * function in the abstract sense.
+ *
+ * A WindowAggregationHandle is created by calling
+ * WindowAggregateFunction::createHandle(). The WindowAggregationHandle object
+ * provides methods that are used to actually compute the window aggregate,
+ * storing intermediate results in WindowAggregationState objects.
+ *
+ * The work flow for computing a window aggregate is:
+ * 1. Create an initial state by createInitialState().
+ * 2. One thread will handle all the computation, iterating from the first
+ * tuple to the last tuple. Note there will be two modes that could be
+ * used upon different situations:
+ * a. If the window aggregate is defined as accumulative, which are:
+ * i. Functions applied to whole partition, such as rank(), ntile()
+ * and dense_rank().
+ * ii. The window frame is defined as "BETWEEN UNBOUNDED PRECEDING
+ * AND CURRENT ROW" or "BETWEEN CURRENT ROW AND UNBOUNDED
+ * FOLLOWING".
+ * Then, for functions except median, we could store some global
+ * values in the state without keeping all the tuple values around.
+ * b. If the window frame is sliding, such as "BETWEEN 3 PRECEDING AND
+ * 3 FOLLOWING", we have to store all the tuples in the state so that
+ * we could know which values should be dropped as the window slides.
+ * For each computed value, generate a tuple store in the column vector.
+ * 3. Insert the new column into the original relation and return.
+ *
+ * TODO(Shixuan): Currently we don't support parallelization. The basic idea for
+ * parallelization is to calculate the partial result inside each block. Each
+ * block could visit the following blocks as long as the block's last partition
+ * is not finished. WindowAggregationOperationState will be used for handling
+ * the global state of the calculation.
+ **/
+
+class WindowAggregationHandle {
+ public:
+ /**
+ * @brief Destructor.
+ **/
+ virtual ~WindowAggregationHandle() {}
+
+ /**
+ * @brief Calculate the window aggregate result.
+ *
+ * @param state The start state of the calculation.
+ * @param attribute_accessor A pointer to the value accessor of attributes.
+ * @param argument_ids The attribute_id of arguments in attribute_accessor,
+ * NULL if not all arguments are attributes.
+ * @param argument_accessor A pointer to the value accessor of arguments,
+ * NULL if all arguments are attributes.
+ * @param output_destination The destination for output.
+ **/
+ virtual void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<block_id> &block_ids,
+ const std::vector<attribute_id> &partition_by_ids,
+ const CatalogRelationSchema &relation,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager,
+ InsertDestinationInterface *output_destination) const = 0;
+
+ protected:
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_by_ids The attribute_id of partition keys in
+ * attribute_accessor.
+ * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ * @param storage_manager A pointer to the storage manager.
+ **/
+ WindowAggregationHandle() {}
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandle);
+};
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.cpp b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
new file mode 100644
index 0000000..62f5a88
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.cpp
@@ -0,0 +1,305 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/scalar/Scalar.hpp"
+#include "storage/InsertDestinationInterface.hpp"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonFactory.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class StorageManager;
+
+WindowAggregationHandleAvg::WindowAggregationHandleAvg(
+ const Type &type,
+ std::vector<const Type*> &&partition_key_types)
+ : argument_type_(type) {
+ // We sum Int as Long and Float as Double so that we have more headroom when
+ // adding many values.
+ TypeID type_id;
+ switch (type.getTypeID()) {
+ case kInt:
+ case kLong:
+ type_id = kLong;
+ break;
+ case kFloat:
+ case kDouble:
+ type_id = kDouble;
+ break;
+ default:
+ type_id = type.getTypeID();
+ break;
+ }
+
+ sum_type_ = &(TypeFactory::GetType(type_id));
+
+ // Result is nullable, because AVG() over 0 values (or all NULL values) is
+ // NULL.
+ result_type_
+ = &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .resultTypeForArgumentTypes(*sum_type_, TypeFactory::GetType(kDouble))
+ ->getNullableVersion());
+
+ // Make operators to do arithmetic:
+ // Add operator for summing argument values.
+ fast_add_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, argument_type_));
+ // Divide operator for dividing sum by count to get final average.
+ divide_operator_.reset(
+ BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
+ .makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
+ // Comparison operators for checking if two tuples belong to the same partition.
+ for (const Type *partition_key_type : partition_key_types) {
+ equal_comparators_.emplace_back(
+ ComparisonFactory::GetComparison(ComparisonID::kEqual)
+ .makeUncheckedComparatorForTypes(*partition_key_type, *partition_key_type));
+ }
+}
+
+void WindowAggregationHandleAvg::calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<block_id> &block_ids,
+ const std::vector<attribute_id> &partition_by_ids,
+ const CatalogRelationSchema &relation,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager,
+ InsertDestinationInterface *output_destination) const {
+ DCHECK(arguments.size() == 1);
+ DCHECK(!block_ids.empty());
+
+ // Initialize the tuple accessors and argument accessors.
+ // Index of each value accessor indicates the block it belongs to.
+ std::vector<ValueAccessor*> tuple_accessors;
+ std::vector<ColumnVectorsValueAccessor*> argument_accessors;
+ for (block_id bid : block_ids) {
+ // Get tuple accessor.
+ BlockReference block = storage_manager->getBlock(bid, relation);
+ const TupleStorageSubBlock &tuple_block = block->getTupleStorageSubBlock();
+ ValueAccessor *tuple_accessor = tuple_block.createValueAccessor();
+ tuple_accessors.push_back(tuple_accessor);
+
+ // Get argument accessor.
+ ColumnVectorsValueAccessor argument_accessor;
+ SubBlocksReference sub_block_ref(tuple_block,
+ block->getIndices(),
+ block->getIndicesConsistent());
+ argument_accessor.addColumn(
+ arguments.front()->getAllValues(tuple_accessor, &sub_block_ref));
+ argument_accessors.push_back(&argument_accessor);
+ }
+
+ // Create a window for each tuple and calculate the window aggregate.
+ for (std::uint32_t current_block_index = 0;
+ current_block_index < block_ids.size();
+ ++current_block_index) {
+ ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
+ ColumnVectorsValueAccessor* argument_accessor =
+ argument_accessors[current_block_index];
+
+ InvokeOnAnyValueAccessor (
+ tuple_accessor,
+ [&] (auto *tuple_accessor) -> void {
+ tuple_accessor->beginIteration();
+ argument_accessor->beginIteration();
+
+ while (tuple_accessor->next() && argument_accessor->next()) {
+ TypedValue window_aggregate = this->calculateOneWindow(tuple_accessors,
+ argument_accessors,
+ partition_by_ids,
+ current_block_index,
+ is_row,
+ num_preceding,
+ num_following);
+ Tuple *current_tuple = tuple_accessor->getTuple();
+ std::vector<TypedValue> new_tuple;
+ for (TypedValue value : *current_tuple) {
+ new_tuple.push_back(value);
+ }
+
+ new_tuple.push_back(window_aggregate);
+ output_destination->insertTupleInBatch(Tuple(std::move(new_tuple)));
+ }
+ });
+ }
+}
+
+TypedValue WindowAggregationHandleAvg::calculateOneWindow(
+ std::vector<ValueAccessor*> &tuple_accessors,
+ std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
+ const std::vector<attribute_id> &partition_by_ids,
+ const std::uint32_t current_block_index,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const {
+ // Initialize.
+ ValueAccessor *tuple_accessor = tuple_accessors[current_block_index];
+ ColumnVectorsValueAccessor *argument_accessor = argument_accessors[current_block_index];
+ TypedValue sum = sum_type_->makeZeroValue();
+ sum = fast_add_operator_->
+ applyToTypedValues(sum, argument_accessor->getTypedValue(0));
+ std::uint64_t count = 1;
+
+ // Get the partition key for the current row.
+ std::vector<TypedValue> current_row_partition_key;
+ for (attribute_id partition_by_id : partition_by_ids) {
+ current_row_partition_key.push_back(
+ tuple_accessor->getTypedValueVirtual(partition_by_id));
+ }
+
+ // Get current position.
+ tuple_id current_tuple_id = tuple_accessor->getCurrentPositionVirtual();
+
+ // Find preceding tuples.
+ int count_preceding = 0;
+ tuple_id preceding_tuple_id = current_tuple_id;
+ block_id preceding_block_index = current_block_index;
+ while (num_preceding == -1 || count_preceding < num_preceding) {
+ preceding_tuple_id--;
+
+ // If the preceding tuple locates in the previous block, move to the
+ // previous block and continue searching.
+ // TODO(Shixuan): If it is possible to have empty blocks, "if" has to be
+ // changed to "while".
+ if (preceding_tuple_id < 0) {
+ // First tuple of the first block, no more preceding blocks.
+ preceding_block_index--;
+ if (preceding_block_index < 0) {
+ break;
+ }
+
+ tuple_accessor = tuple_accessors[preceding_block_index];
+ argument_accessor = argument_accessors[preceding_block_index];
+ preceding_tuple_id = argument_accessor->getNumTuples() - 1;
+ }
+
+ // Get the partition keys and compare. If not the same partition as the
+ // current row, end searching preceding tuples.
+ if (!samePartition(current_row_partition_key,
+ tuple_accessor,
+ preceding_tuple_id,
+ partition_by_ids)) {
+ break;
+ }
+
+
+ // Actually count the element and do the calculation.
+ count_preceding++;
+ sum = fast_add_operator_->applyToTypedValues(
+ sum,
+ argument_accessor->getTypedValueAtAbsolutePosition(0, preceding_tuple_id));
+ }
+
+ count += count_preceding;
+
+ // Find following tuples.
+ int count_following = 0;
+ tuple_id following_tuple_id = current_tuple_id;
+ block_id following_block_index = current_block_index;
+ while (num_following == -1 || count_following < num_following) {
+ following_tuple_id++;
+
+ // If the following tuple locates in the next block, move to the next block
+ // and continue searching.
+ // TODO(Shixuan): If it is possible to have empty blocks, "if" has to be
+ // changed to "while".
+ if (following_tuple_id >= argument_accessor->getNumTuples()) {
+ following_block_index++;
+ // Last tuple of the last block, no more following blocks.
+ if (following_block_index == tuple_accessors.size()) {
+ break;
+ }
+
+ tuple_accessor = tuple_accessors[following_block_index];
+ argument_accessor = argument_accessors[following_block_index];
+ following_tuple_id = 0;
+ }
+
+ // Get the partition keys and compare. If not the same partition as the
+ // current row, end searching preceding tuples.
+ if (!samePartition(current_row_partition_key,
+ tuple_accessor,
+ following_tuple_id,
+ partition_by_ids)) {
+ break;
+ }
+
+
+ // Actually count the element and do the calculation.
+ count_following++;
+ sum = fast_add_operator_->applyToTypedValues(
+ sum,
+ argument_accessor->getTypedValueAtAbsolutePosition(0, following_tuple_id));
+ }
+
+ count += count_following;
+
+
+ return divide_operator_->applyToTypedValues(sum,
+ TypedValue(static_cast<double>(count)));
+}
+
+bool WindowAggregationHandleAvg::samePartition(
+ const std::vector<TypedValue> ¤t_row_partition_key,
+ ValueAccessor *tuple_accessor,
+ const tuple_id boundary_tuple_id,
+ const std::vector<attribute_id> &partition_by_ids) const {
+ return InvokeOnAnyValueAccessor (tuple_accessor,
+ [&] (auto *tuple_accessor) -> bool {
+ for (std::uint32_t partition_by_index = 0;
+ partition_by_index < partition_by_ids.size();
+ ++partition_by_index) {
+ if (!equal_comparators_[partition_by_index]->compareTypedValues(
+ current_row_partition_key[partition_by_index],
+ tuple_accessor->getTypedValueAtAbsolutePosition(
+ partition_by_ids[partition_by_index], boundary_tuple_id))) {
+ return false;
+ }
+ }
+
+ return true;
+ });
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationHandleAvg.hpp b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
new file mode 100644
index 0000000..115152e
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationHandleAvg.hpp
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "types/Type.hpp"
+#include "types/TypedValue.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class ColumnVector;
+class ColumnVectorsValueAccessor;
+class InsertDestinationInterface;
+class StorageManager;
+class ValueAccessor;
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief A WindowAggregationHandle for average.
+ **/
+class WindowAggregationHandleAvg : public WindowAggregationHandle {
+ public:
+ ~WindowAggregationHandleAvg() override {}
+
+ void calculate(const std::vector<std::unique_ptr<const Scalar>> &arguments,
+ const std::vector<block_id> &block_ids,
+ const std::vector<attribute_id> &partition_by_ids,
+ const CatalogRelationSchema &relation,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following,
+ StorageManager *storage_manager,
+ InsertDestinationInterface *output_destination) const;
+
+ private:
+ friend class WindowAggregateFunctionAvg;
+
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_by_ids The attribute_id of partition keys in
+ * attribute_accessor.
+ * @param is_row True if the frame mode is ROWS, false if it is RANGE.
+ * @param num_preceding The number of rows/range that precedes the current row.
+ * @param num_following The number of rows/range that follows the current row.
+ * @param storage_manager A pointer to the storage manager.
+ * @param type Type of the avg value.
+ **/
+ explicit WindowAggregationHandleAvg(const Type &type,
+ std::vector<const Type*> &&partition_key_types);
+
+ TypedValue calculateOneWindow(
+ std::vector<ValueAccessor*> &tuple_accessors,
+ std::vector<ColumnVectorsValueAccessor*> &argument_accessors,
+ const std::vector<attribute_id> &partition_by_ids,
+ const std::uint32_t current_block_index,
+ const bool is_row,
+ const std::int64_t num_preceding,
+ const std::int64_t num_following) const;
+
+ bool samePartition(const std::vector<TypedValue> ¤t_row_partition_key,
+ ValueAccessor *tuple_accessor,
+ const tuple_id boundary_tuple_id,
+ const std::vector<attribute_id> &partition_by_ids) const;
+
+ const Type &argument_type_;
+ const Type *sum_type_;
+ const Type *result_type_;
+ std::unique_ptr<UncheckedBinaryOperator> fast_add_operator_;
+ std::unique_ptr<UncheckedBinaryOperator> divide_operator_;
+ std::vector<std::unique_ptr<UncheckedComparator>> equal_comparators_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregationHandleAvg);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_HANDLE_AVG_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/WindowAggregationID.hpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/WindowAggregationID.hpp b/expressions/window_aggregation/WindowAggregationID.hpp
new file mode 100644
index 0000000..74c948f
--- /dev/null
+++ b/expressions/window_aggregation/WindowAggregationID.hpp
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+#define QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ * @{
+ */
+
+/**
+ * @brief The possible types of window aggregations.
+ **/
+enum class WindowAggregationID {
+ kAvg,
+ kCount,
+ kMin,
+ kMax,
+ kSum
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_EXPRESSIONS_WINDOW_AGGREGATION_WINDOW_AGGREGATION_ID_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
----------------------------------------------------------------------
diff --git a/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
new file mode 100644
index 0000000..6a7d161
--- /dev/null
+++ b/expressions/window_aggregation/tests/WindowAggregationHandleAvg_unittest.cpp
@@ -0,0 +1,526 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunctionFactory.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
+#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
+#include "expressions/window_aggregation/WindowAggregationID.hpp"
+#include "storage/StorageManager.hpp"
+#include "types/CharType.hpp"
+#include "types/DateOperatorOverloads.hpp"
+#include "types/DatetimeIntervalType.hpp"
+#include "types/DoubleType.hpp"
+#include "types/FloatType.hpp"
+#include "types/IntType.hpp"
+#include "types/IntervalLit.hpp"
+#include "types/LongType.hpp"
+#include "types/Type.hpp"
+#include "types/TypeFactory.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "types/VarCharType.hpp"
+#include "types/YearMonthIntervalType.hpp"
+#include "types/containers/ColumnVector.hpp"
+
+#include "gtest/gtest.h"
+
+namespace quickstep {
+
+namespace {
+
+ constexpr int kNumSamples = 100;
+
+} // namespace
+
+class WindowAggregationHandleAvgTest : public::testing::Test {
+ protected:
+ // Handle initialization.
+ void initializeHandle(const Type &argument_type,
+ const std::vector<const Type*> &partition_key_types) {
+ WindowAggregateFunction *function =
+ WindowAggregateFactory::Get(WindowAggregationID::kAvg);
+ handle_avg_.reset(function->createHandle(std::vector<const Type*>(1, &argument_type),
+ partition_key_types));
+ }
+
+ // Test canApplyToTypes().
+ static bool CanApplyToTypesTest(TypeID typeID) {
+ const Type &type = (typeID == kChar || typeID == kVarChar) ?
+ TypeFactory::GetType(typeID, static_cast<std::size_t>(10)) :
+ TypeFactory::GetType(typeID);
+
+ return WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).canApplyToTypes(
+ std::vector<const Type*>(1, &type));
+ }
+
+ // Test resultTypeForArgumentTypes().
+ static bool ResultTypeForArgumentTypesTest(TypeID input_type_id,
+ TypeID output_type_id) {
+ const Type *result_type
+ = WindowAggregateFunctionFactory::Get(WindowAggregationID::kAvg).resultTypeForArgumentTypes(
+ std::vector<const Type*>(1, &TypeFactory::GetType(input_type_id)));
+ return (result_type->getTypeID() == output_type_id);
+ }
+
+ template <typename CppType>
+ static void CheckAvgValue(
+ CppType expected,
+ const AggregationHandle &handle,
+ const AggregationState &state) {
+ EXPECT_EQ(expected, handle.finalize(state).getLiteral<CppType>());
+ }
+
+ // Static templated method for set a meaningful value to data types.
+ template <typename CppType>
+ static void SetDataType(int value, CppType *data) {
+ *data = value;
+ }
+
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkAggregationAvgGeneric() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+ EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+ typename GenericType::cpptype val;
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+
+ iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
+ for (int i = 0; i < kNumSamples; ++i) {
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, &val);
+ } else {
+ SetDataType(static_cast<float>(i - 10)/10, &val);
+ }
+ iterateHandle(aggregation_handle_avg_state_.get(), type.makeValue(&val));
+ sum += val;
+ }
+ iterateHandle(aggregation_handle_avg_state_.get(), type.makeNullValue());
+ CheckAvgValue<typename OutputType::cpptype>(static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+
+ // Test mergeStates().
+ std::unique_ptr<AggregationState> merge_state(
+ aggregation_handle_avg_->createInitialState());
+ aggregation_handle_avg_->mergeStates(*merge_state,
+ aggregation_handle_avg_state_.get());
+
+ iterateHandle(merge_state.get(), type.makeNullValue());
+ for (int i = 0; i < kNumSamples; ++i) {
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, &val);
+ } else {
+ SetDataType(static_cast<float>(i - 10)/10, &val);
+ }
+ iterateHandle(merge_state.get(), type.makeValue(&val));
+ sum += val;
+ }
+
+ aggregation_handle_avg_->mergeStates(*merge_state,
+ aggregation_handle_avg_state_.get());
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / (2 * kNumSamples),
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+ }
+
+ template <typename GenericType>
+ ColumnVector *createColumnVectorGeneric(const Type &type, typename GenericType::cpptype *sum) {
+ NativeColumnVector *column = new NativeColumnVector(type, kNumSamples + 3);
+
+ typename GenericType::cpptype val;
+ SetDataType(0, sum);
+
+ column->appendTypedValue(type.makeNullValue());
+ for (int i = 0; i < kNumSamples; ++i) {
+ if (type.getTypeID() == kInt || type.getTypeID() == kLong) {
+ SetDataType(i - 10, &val);
+ } else {
+ SetDataType(static_cast<float>(i - 10)/10, &val);
+ }
+ column->appendTypedValue(type.makeValue(&val));
+ *sum += val;
+ // One NULL in the middle.
+ if (i == kNumSamples/2) {
+ column->appendTypedValue(type.makeNullValue());
+ }
+ }
+ column->appendTypedValue(type.makeNullValue());
+
+ return column;
+ }
+
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkAggregationAvgGenericColumnVector() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+ EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ std::vector<std::unique_ptr<ColumnVector>> column_vectors;
+ column_vectors.emplace_back(createColumnVectorGeneric<GenericType>(type, &sum));
+
+ std::unique_ptr<AggregationState> cv_state(
+ aggregation_handle_avg_->accumulateColumnVectors(column_vectors));
+
+ // Test the state generated directly by accumulateColumnVectors(), and also
+ // test after merging back.
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *cv_state);
+
+ aggregation_handle_avg_->mergeStates(*cv_state, aggregation_handle_avg_state_.get());
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+ }
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+ template <typename GenericType, typename OutputType = DoubleType>
+ void checkAggregationAvgGenericValueAccessor() {
+ const GenericType &type = GenericType::Instance(true);
+ initializeHandle(type);
+ EXPECT_TRUE(aggregation_handle_avg_->finalize(*aggregation_handle_avg_state_).isNull());
+
+ typename GenericType::cpptype sum;
+ SetDataType(0, &sum);
+ std::unique_ptr<ColumnVectorsValueAccessor> accessor(new ColumnVectorsValueAccessor());
+ accessor->addColumn(createColumnVectorGeneric<GenericType>(type, &sum));
+
+ std::unique_ptr<AggregationState> va_state(
+ aggregation_handle_avg_->accumulateValueAccessor(accessor.get(),
+ std::vector<attribute_id>(1, 0)));
+
+ // Test the state generated directly by accumulateValueAccessor(), and also
+ // test after merging back.
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *va_state);
+
+ aggregation_handle_avg_->mergeStates(*va_state, aggregation_handle_avg_state_.get());
+ CheckAvgValue<typename OutputType::cpptype>(
+ static_cast<typename OutputType::cpptype>(sum) / kNumSamples,
+ *aggregation_handle_avg_,
+ *aggregation_handle_avg_state_);
+ }
+#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+
+ std::unique_ptr<AggregationHandle> aggregation_handle_avg_;
+ std::unique_ptr<AggregationState> aggregation_handle_avg_state_;
+ std::unique_ptr<StorageManager> storage_manager_;
+};
+
+const int AggregationHandleAvgTest::kNumSamples;
+
+template <>
+void AggregationHandleAvgTest::CheckAvgValue<double>(
+ double expected,
+ const AggregationHandle &handle,
+ const AggregationState &state) {
+ EXPECT_DOUBLE_EQ(expected, handle.finalize(state).getLiteral<double>());
+}
+
+template <>
+void AggregationHandleAvgTest::SetDataType<DatetimeIntervalLit>(int value, DatetimeIntervalLit *data) {
+ data->interval_ticks = value;
+}
+
+template <>
+void AggregationHandleAvgTest::SetDataType<YearMonthIntervalLit>(int value, YearMonthIntervalLit *data) {
+ data->months = value;
+}
+
+typedef AggregationHandleAvgTest AggregationHandleAvgDeathTest;
+
+TEST_F(AggregationHandleAvgTest, IntTypeTest) {
+ checkAggregationAvgGeneric<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeTest) {
+ checkAggregationAvgGeneric<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeTest) {
+ checkAggregationAvgGeneric<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeTest) {
+ checkAggregationAvgGeneric<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeTest) {
+ checkAggregationAvgGeneric<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeTest) {
+ checkAggregationAvgGeneric<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, IntTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeColumnVectorTest) {
+ checkAggregationAvgGenericColumnVector<YearMonthIntervalType, YearMonthIntervalType>();
+}
+
+#ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+TEST_F(AggregationHandleAvgTest, IntTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<IntType>();
+}
+
+TEST_F(AggregationHandleAvgTest, LongTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<LongType>();
+}
+
+TEST_F(AggregationHandleAvgTest, FloatTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<FloatType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DoubleTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<DoubleType>();
+}
+
+TEST_F(AggregationHandleAvgTest, DatetimeIntervalTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<DatetimeIntervalType, DatetimeIntervalType>();
+}
+
+TEST_F(AggregationHandleAvgTest, YearMonthIntervalTypeValueAccessorTest) {
+ checkAggregationAvgGenericValueAccessor<YearMonthIntervalType, YearMonthIntervalType>();
+}
+#endif // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
+
+#ifdef QUICKSTEP_DEBUG
+TEST_F(AggregationHandleAvgDeathTest, CharTypeTest) {
+ const Type &type = CharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(AggregationHandleAvgDeathTest, VarTypeTest) {
+ const Type &type = VarCharType::Instance(true, 10);
+ EXPECT_DEATH(initializeHandle(type), "");
+}
+
+TEST_F(AggregationHandleAvgDeathTest, WrongTypeTest) {
+ const Type &int_non_null_type = IntType::Instance(false);
+ const Type &long_type = LongType::Instance(true);
+ const Type &double_type = DoubleType::Instance(true);
+ const Type &float_type = FloatType::Instance(true);
+ const Type &char_type = CharType::Instance(true, 10);
+ const Type &varchar_type = VarCharType::Instance(true, 10);
+
+ initializeHandle(IntType::Instance(true));
+ int int_val = 0;
+ std::int64_t long_val = 0;
+ double double_val = 0;
+ float float_val = 0;
+
+ iterateHandle(aggregation_handle_avg_state_.get(), int_non_null_type.makeValue(&int_val));
+
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), long_type.makeValue(&long_val)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), double_type.makeValue(&double_val)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), float_type.makeValue(&float_val)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), char_type.makeValue("asdf", 5)), "");
+ EXPECT_DEATH(iterateHandle(aggregation_handle_avg_state_.get(), varchar_type.makeValue("asdf", 5)), "");
+
+ // Test mergeStates() with incorrectly typed handles.
+ std::unique_ptr<AggregationHandle> aggregation_handle_avg_double(
+ AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
+ std::vector<const Type*>(1, &double_type)));
+ std::unique_ptr<AggregationState> aggregation_state_avg_merge_double(
+ aggregation_handle_avg_double->createInitialState());
+ static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_double).iterateUnaryInl(
+ static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_double.get()),
+ double_type.makeValue(&double_val));
+ EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_double,
+ aggregation_handle_avg_state_.get()),
+ "");
+
+ std::unique_ptr<AggregationHandle> aggregation_handle_avg_float(
+ AggregateFunctionFactory::Get(AggregationID::kAvg).createHandle(
+ std::vector<const Type*>(1, &float_type)));
+ std::unique_ptr<AggregationState> aggregation_state_avg_merge_float(
+ aggregation_handle_avg_float->createInitialState());
+ static_cast<const AggregationHandleAvg&>(*aggregation_handle_avg_float).iterateUnaryInl(
+ static_cast<AggregationStateAvg*>(aggregation_state_avg_merge_float.get()),
+ float_type.makeValue(&float_val));
+ EXPECT_DEATH(aggregation_handle_avg_->mergeStates(*aggregation_state_avg_merge_float,
+ aggregation_handle_avg_state_.get()),
+ "");
+}
+#endif
+
+TEST_F(AggregationHandleAvgTest, canApplyToTypeTest) {
+ EXPECT_TRUE(ApplyToTypesTest(kInt));
+ EXPECT_TRUE(ApplyToTypesTest(kLong));
+ EXPECT_TRUE(ApplyToTypesTest(kFloat));
+ EXPECT_TRUE(ApplyToTypesTest(kDouble));
+ EXPECT_FALSE(ApplyToTypesTest(kChar));
+ EXPECT_FALSE(ApplyToTypesTest(kVarChar));
+ EXPECT_FALSE(ApplyToTypesTest(kDatetime));
+ EXPECT_TRUE(ApplyToTypesTest(kDatetimeInterval));
+ EXPECT_TRUE(ApplyToTypesTest(kYearMonthInterval));
+}
+
+TEST_F(AggregationHandleAvgTest, ResultTypeForArgumentTypeTest) {
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kInt, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kLong, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kFloat, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDouble, kDouble));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kDatetimeInterval, kDatetimeInterval));
+ EXPECT_TRUE(ResultTypeForArgumentTypeTest(kYearMonthInterval, kYearMonthInterval));
+}
+
+TEST_F(AggregationHandleAvgTest, GroupByTableMergeTestAvg) {
+ const Type &long_non_null_type = LongType::Instance(false);
+ initializeHandle(long_non_null_type);
+ storage_manager_.reset(new StorageManager("./test_avg_data"));
+ std::unique_ptr<AggregationStateHashTableBase> source_hash_table(
+ aggregation_handle_avg_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+ std::unique_ptr<AggregationStateHashTableBase> destination_hash_table(
+ aggregation_handle_avg_->createGroupByHashTable(
+ HashTableImplType::kSimpleScalarSeparateChaining,
+ std::vector<const Type *>(1, &long_non_null_type),
+ 10,
+ storage_manager_.get()));
+
+ AggregationStateHashTable<AggregationStateAvg> *destination_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ destination_hash_table.get());
+
+ AggregationStateHashTable<AggregationStateAvg> *source_hash_table_derived =
+ static_cast<AggregationStateHashTable<AggregationStateAvg> *>(
+ source_hash_table.get());
+
+ AggregationHandleAvg *aggregation_handle_avg_derived =
+ static_cast<AggregationHandleAvg *>(aggregation_handle_avg_.get());
+ // We create three keys: first is present in both the hash tables, second key
+ // is present only in the source hash table while the third key is present
+ // the destination hash table only.
+ std::vector<TypedValue> common_key;
+ common_key.emplace_back(static_cast<std::int64_t>(0));
+ std::vector<TypedValue> exclusive_source_key, exclusive_destination_key;
+ exclusive_source_key.emplace_back(static_cast<std::int64_t>(1));
+ exclusive_destination_key.emplace_back(static_cast<std::int64_t>(2));
+
+ const std::int64_t common_key_source_avg = 355;
+ TypedValue common_key_source_avg_val(common_key_source_avg);
+
+ const std::int64_t common_key_destination_avg = 295;
+ TypedValue common_key_destination_avg_val(common_key_destination_avg);
+
+ const std::int64_t exclusive_key_source_avg = 1;
+ TypedValue exclusive_key_source_avg_val(exclusive_key_source_avg);
+
+ const std::int64_t exclusive_key_destination_avg = 1;
+ TypedValue exclusive_key_destination_avg_val(exclusive_key_destination_avg);
+
+ std::unique_ptr<AggregationStateAvg> common_key_source_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> common_key_destination_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> exclusive_key_source_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+ std::unique_ptr<AggregationStateAvg> exclusive_key_destination_state(
+ static_cast<AggregationStateAvg *>(
+ aggregation_handle_avg_->createInitialState()));
+
+ // Create avg value states for keys.
+ aggregation_handle_avg_derived->iterateUnaryInl(common_key_source_state.get(),
+ common_key_source_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ common_key_destination_state.get(), common_key_destination_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ exclusive_key_destination_state.get(), exclusive_key_destination_avg_val);
+
+ aggregation_handle_avg_derived->iterateUnaryInl(
+ exclusive_key_source_state.get(), exclusive_key_source_avg_val);
+
+ // Add the key-state pairs to the hash tables.
+ source_hash_table_derived->putCompositeKey(common_key,
+ *common_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ common_key, *common_key_destination_state);
+ source_hash_table_derived->putCompositeKey(exclusive_source_key,
+ *exclusive_key_source_state);
+ destination_hash_table_derived->putCompositeKey(
+ exclusive_destination_key, *exclusive_key_destination_state);
+
+ EXPECT_EQ(2u, destination_hash_table_derived->numEntries());
+ EXPECT_EQ(2u, source_hash_table_derived->numEntries());
+
+ aggregation_handle_avg_->mergeGroupByHashTables(*source_hash_table,
+ destination_hash_table.get());
+
+ EXPECT_EQ(3u, destination_hash_table_derived->numEntries());
+
+ CheckAvgValue<double>(
+ (common_key_destination_avg_val.getLiteral<std::int64_t>() +
+ common_key_source_avg_val.getLiteral<std::int64_t>()) / static_cast<double>(2),
+ *aggregation_handle_avg_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(common_key)));
+ CheckAvgValue<double>(exclusive_key_destination_avg_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_avg_derived,
+ *(destination_hash_table_derived->getSingleCompositeKey(
+ exclusive_destination_key)));
+ CheckAvgValue<double>(exclusive_key_source_avg_val.getLiteral<std::int64_t>(),
+ *aggregation_handle_avg_derived,
+ *(source_hash_table_derived->getSingleCompositeKey(
+ exclusive_source_key)));
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 7e53b9d..a56b714 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -69,6 +69,8 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
quickstep_expressions_predicate_Predicate
quickstep_expressions_scalar_Scalar
quickstep_expressions_scalar_ScalarAttribute
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction_proto
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
quickstep_queryoptimizer_ExecutionHeuristics
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 43d63f9..ce21ade 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -48,6 +48,8 @@
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "expressions/scalar/ScalarAttribute.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.pb.h"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_optimizer/ExecutionHeuristics.hpp"
@@ -1652,7 +1654,7 @@ void ExecutionGenerator::convertWindowAggregate(
// Get input.
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
- window_aggr_state_proto->set_relation_id(input_relation_info->relation->getID());
+ window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
// Get window aggregate function expression.
const E::AliasPtr &named_window_aggregate_expression =
@@ -1713,6 +1715,7 @@ void ExecutionGenerator::convertWindowAggregate(
const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
execution_plan_->addRelationalOperator(
new WindowAggregationOperator(query_handle_->query_id(),
+ *input_relation_info->relation,
*output_relation,
window_aggr_state_index,
insert_destination_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 08d7df5..d12644a 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -304,7 +304,7 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
quickstep_utility_Macros)
target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
glog
- quickstep_expressions_aggregation_AggregateFunction
+ quickstep_expressions_windowaggregation_WindowAggregateFunction
quickstep_queryoptimizer_OptimizerTree
quickstep_queryoptimizer_expressions_AttributeReference
quickstep_queryoptimizer_expressions_Expression
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
index 7b1f304..be5db59 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.cpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -22,7 +22,7 @@
#include <utility>
#include <vector>
-#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/window_aggregation/WindowAggregateFunction.hpp"
#include "query_optimizer/expressions/AttributeReference.hpp"
#include "query_optimizer/expressions/Expression.hpp"
#include "query_optimizer/expressions/PatternMatcher.hpp"
@@ -59,7 +59,7 @@ const Type& WindowAggregateFunction::getValueType() const {
}
WindowAggregateFunctionPtr WindowAggregateFunction::Create(
- const ::quickstep::AggregateFunction &window_aggregate,
+ const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/29664050/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
index 0bee28f..279dfa1 100644
--- a/query_optimizer/expressions/WindowAggregateFunction.hpp
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -33,7 +33,7 @@
namespace quickstep {
-class AggregateFunction;
+class WindowAggregateFunction;
class Type;
namespace optimizer {
@@ -140,7 +140,7 @@ class WindowAggregateFunction : public Expression {
* @return The WindowAggregateFunction singleton (from the expression system)
* for this node.
**/
- inline const ::quickstep::AggregateFunction& window_aggregate() const {
+ inline const ::quickstep::WindowAggregateFunction& window_aggregate() const {
return window_aggregate_;
}
@@ -185,7 +185,7 @@ class WindowAggregateFunction : public Expression {
* @param is_distinct Whether this is a DISTINCT aggregation.
* @return A new AggregateFunctionPtr.
**/
- static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+ static WindowAggregateFunctionPtr Create(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -209,7 +209,7 @@ class WindowAggregateFunction : public Expression {
* @param window_info The window info of the window aggregate function.
* @param is_distinct Indicates whether this is a DISTINCT aggregation.
*/
- WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+ WindowAggregateFunction(const ::quickstep::WindowAggregateFunction &window_aggregate,
const std::vector<ScalarPtr> &arguments,
const WindowInfo &window_info,
const std::string &window_name,
@@ -228,7 +228,7 @@ class WindowAggregateFunction : public Expression {
// window_aggregate_. If it really needs to be seperated from the
// AggregationFunction, a new class for WindowAggregationFunction should be
// created as quickstep::WindowAggregateFunction.
- const ::quickstep::AggregateFunction &window_aggregate_;
+ const ::quickstep::WindowAggregateFunction &window_aggregate_;
std::vector<ScalarPtr> arguments_;
const WindowInfo window_info_;
const std::string window_name_;