You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/25 14:40:34 UTC
[01/11] incubator-quickstep git commit: Added Window Aggregation
Function in optimizer. [Forced Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/scheduler++ 688eb25c5 -> 2a0def654 (forced update)
Added Window Aggregation Function in optimizer.
- The resolver could understand optional window clause w/ aggregation functions.
- Only one window aggregation function is allowed.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/714874ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/714874ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/714874ce
Branch: refs/heads/scheduler++
Commit: 714874ce54e12972285a43f92784ef6954a8b6fd
Parents: d642891
Author: shixuan <sh...@wisc.edu>
Authored: Tue Jun 21 20:08:52 2016 +0000
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Jun 24 18:59:47 2016 -0700
----------------------------------------------------------------------
query_optimizer/expressions/CMakeLists.txt | 17 +-
query_optimizer/expressions/ExpressionType.hpp | 3 +-
query_optimizer/expressions/PatternMatcher.hpp | 3 +
.../expressions/WindowAggregateFunction.cpp | 193 ++++++++++++
.../expressions/WindowAggregateFunction.hpp | 246 +++++++++++++++
query_optimizer/logical/CMakeLists.txt | 18 +-
query_optimizer/logical/LogicalType.hpp | 3 +-
query_optimizer/logical/PatternMatcher.hpp | 2 +
query_optimizer/logical/WindowAggregate.cpp | 85 +++++
query_optimizer/logical/WindowAggregate.hpp | 123 ++++++++
query_optimizer/resolver/CMakeLists.txt | 2 +
query_optimizer/resolver/Resolver.cpp | 314 ++++++++++++++++++-
query_optimizer/resolver/Resolver.hpp | 66 +++-
query_optimizer/strategy/CMakeLists.txt | 3 +-
query_optimizer/strategy/OneToOne.cpp | 5 +
.../tests/logical_generator/Select.test | 162 ++++++++++
query_optimizer/tests/resolver/Select.test | 162 ++++++++++
17 files changed, 1387 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 6c40741..08d7df5 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -43,6 +43,7 @@ add_library(quickstep_queryoptimizer_expressions_SearchedCase SearchedCase.cpp S
add_library(quickstep_queryoptimizer_expressions_SimpleCase SimpleCase.cpp SimpleCase.hpp)
add_library(quickstep_queryoptimizer_expressions_SubqueryExpression SubqueryExpression.cpp SubqueryExpression.hpp)
add_library(quickstep_queryoptimizer_expressions_UnaryExpression UnaryExpression.cpp UnaryExpression.hpp)
+add_library(quickstep_queryoptimizer_expressions_WindowAggregateFunction WindowAggregateFunction.cpp WindowAggregateFunction.hpp)
# Link dependencies:
target_link_libraries(quickstep_queryoptimizer_expressions_AggregateFunction
@@ -301,6 +302,19 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
quickstep_types_operations_unaryoperations_UnaryOperation
quickstep_types_operations_unaryoperations_UnaryOperationID
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
+ glog
+ quickstep_expressions_aggregation_AggregateFunction
+ quickstep_queryoptimizer_OptimizerTree
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_Expression
+ quickstep_queryoptimizer_expressions_ExpressionType
+ quickstep_queryoptimizer_expressions_PatternMatcher
+ quickstep_queryoptimizer_expressions_Scalar
+ quickstep_types_Type
+ quickstep_utility_Cast
+ quickstep_utility_Macros)
+
# Module all-in-one library:
add_library(quickstep_queryoptimizer_expressions ../../empty_src.cpp OptimizerExpressionsModule.hpp)
@@ -330,4 +344,5 @@ target_link_libraries(quickstep_queryoptimizer_expressions
quickstep_queryoptimizer_expressions_SearchedCase
quickstep_queryoptimizer_expressions_SimpleCase
quickstep_queryoptimizer_expressions_SubqueryExpression
- quickstep_queryoptimizer_expressions_UnaryExpression)
+ quickstep_queryoptimizer_expressions_UnaryExpression
+ quickstep_queryoptimizer_expressions_WindowAggregateFunction)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/ExpressionType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/ExpressionType.hpp b/query_optimizer/expressions/ExpressionType.hpp
index 23770e0..77e0874 100644
--- a/query_optimizer/expressions/ExpressionType.hpp
+++ b/query_optimizer/expressions/ExpressionType.hpp
@@ -49,7 +49,8 @@ enum class ExpressionType {
kSearchedCase,
kSimpleCase,
kSubqueryExpression,
- kUnaryExpression
+ kUnaryExpression,
+ kWindowAggregateFunction
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/PatternMatcher.hpp b/query_optimizer/expressions/PatternMatcher.hpp
index 87bc52a..2cc91d6 100644
--- a/query_optimizer/expressions/PatternMatcher.hpp
+++ b/query_optimizer/expressions/PatternMatcher.hpp
@@ -52,6 +52,7 @@ class Scalar;
class ScalarLiteral;
class Sum;
class UnaryExpression;
+class WindowAggregateFunction;
/** \addtogroup OptimizerExpressions
* @{
@@ -155,6 +156,8 @@ using SomeScalar = SomeExpressionNode<Scalar,
ExpressionType::kUnaryExpression>;
using SomeScalarLiteral = SomeExpressionNode<ScalarLiteral, ExpressionType::kScalarLiteral>;
using SomeUnaryExpression = SomeExpressionNode<UnaryExpression, ExpressionType::kUnaryExpression>;
+using SomeWindowAggregateFunction = SomeExpressionNode<WindowAggregateFunction,
+ ExpressionType::kWindowAggregateFunction>;
using SomeAggregateFunction = SomeExpressionNode<AggregateFunction,
ExpressionType::kAggregateFunction>;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..7b1f304
--- /dev/null
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -0,0 +1,193 @@
+/**
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "types/Type.hpp"
+#include "utility/Cast.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace expressions {
+
+bool WindowAggregateFunction::isNullable() const {
+ std::vector<const Type*> argument_types;
+ for (const ScalarPtr &argument : arguments_) {
+ argument_types.emplace_back(&argument->getValueType());
+ }
+
+ const Type *return_type = window_aggregate_.resultTypeForArgumentTypes(argument_types);
+ DCHECK(return_type != nullptr);
+ return return_type->isNullable();
+}
+
+const Type& WindowAggregateFunction::getValueType() const {
+ std::vector<const Type*> argument_types;
+ for (const ScalarPtr &argument : arguments_) {
+ argument_types.emplace_back(&argument->getValueType());
+ }
+
+ const Type *return_type = window_aggregate_.resultTypeForArgumentTypes(argument_types);
+ DCHECK(return_type != nullptr);
+ return *return_type;
+}
+
+WindowAggregateFunctionPtr WindowAggregateFunction::Create(
+ const ::quickstep::AggregateFunction &window_aggregate,
+ const std::vector<ScalarPtr> &arguments,
+ const WindowInfo &window_info,
+ const std::string &window_name,
+ const bool is_distinct) {
+#ifdef QUICKSTEP_DEBUG
+ std::vector<const Type*> argument_types;
+ for (const ScalarPtr &argument : arguments) {
+ argument_types.emplace_back(&argument->getValueType());
+ }
+ DCHECK(window_aggregate.canApplyToTypes(argument_types));
+#endif // QUICKSTEP_DEBUG
+
+ return WindowAggregateFunctionPtr(
+ new WindowAggregateFunction(window_aggregate, arguments, window_info, window_name, is_distinct));
+}
+
+ExpressionPtr WindowAggregateFunction::copyWithNewChildren(
+ const std::vector<ExpressionPtr> &new_children) const {
+ std::vector<ScalarPtr> new_arguments;
+ for (const ExpressionPtr &expression_ptr : new_children) {
+ ScalarPtr expr_as_scalar;
+ CHECK(SomeScalar::MatchesWithConditionalCast(expression_ptr, &expr_as_scalar))
+ << expression_ptr->toString();
+ new_arguments.emplace_back(std::move(expr_as_scalar));
+ }
+
+ return WindowAggregateFunctionPtr(
+ new WindowAggregateFunction(window_aggregate_,
+ new_arguments,
+ window_info_,
+ window_name_,
+ is_distinct_));
+}
+
+std::vector<AttributeReferencePtr> WindowAggregateFunction::getReferencedAttributes() const {
+ std::vector<AttributeReferencePtr> referenced_attributes;
+ for (const ScalarPtr &argument : arguments_) {
+ const std::vector<AttributeReferencePtr> referenced_attributes_in_argument =
+ argument->getReferencedAttributes();
+ referenced_attributes.insert(referenced_attributes.end(),
+ referenced_attributes_in_argument.begin(),
+ referenced_attributes_in_argument.end());
+ }
+
+ referenced_attributes.insert(referenced_attributes.end(),
+ window_info_.partition_by_attributes.begin(),
+ window_info_.partition_by_attributes.end());
+
+ referenced_attributes.insert(referenced_attributes.end(),
+ window_info_.order_by_attributes.begin(),
+ window_info_.order_by_attributes.end());
+
+ return referenced_attributes;
+}
+
+void WindowAggregateFunction::getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+ inline_field_names->push_back("function");
+ inline_field_values->push_back(window_aggregate_.getName());
+
+ container_child_field_names->push_back("arguments");
+ container_child_fields->emplace_back(CastSharedPtrVector<OptimizerTreeBase>(arguments_));
+
+ inline_field_names->push_back("window_name");
+ inline_field_values->push_back(window_name_);
+
+ container_child_field_names->push_back("partition_by");
+ container_child_fields->emplace_back(
+ CastSharedPtrVector<OptimizerTreeBase>(window_info_.partition_by_attributes));
+
+ container_child_field_names->push_back("order_by");
+ container_child_fields->emplace_back(
+ CastSharedPtrVector<OptimizerTreeBase>(window_info_.order_by_attributes));
+
+ inline_field_names->push_back("is_ascending");
+ std::string ascending_list("[");
+ for (const bool is_ascending : window_info_.order_by_directions) {
+ if (is_ascending) {
+ ascending_list.append("true,");
+ } else {
+ ascending_list.append("false,");
+ }
+ }
+ if (!window_info_.order_by_directions.empty()) {
+ ascending_list.pop_back();
+ }
+ ascending_list.append("]");
+ inline_field_values->push_back(ascending_list);
+
+ inline_field_names->push_back("nulls_first");
+ std::string nulls_first_flags("[");
+ for (const bool nulls_first_flag : window_info_.nulls_first) {
+ if (nulls_first_flag) {
+ nulls_first_flags.append("true,");
+ } else {
+ nulls_first_flags.append("false,");
+ }
+ }
+ if (!window_info_.nulls_first.empty()) {
+ nulls_first_flags.pop_back();
+ }
+ nulls_first_flags.append("]");
+ inline_field_values->push_back(nulls_first_flags);
+
+ if (window_info_.frame_info != nullptr) {
+ const WindowFrameInfo *frame_info = window_info_.frame_info;
+
+ inline_field_names->push_back("frame_mode");
+ inline_field_values->push_back(frame_info->is_row ? "row" : "range");
+
+ inline_field_names->push_back("num_preceding");
+ inline_field_values->push_back(std::to_string(frame_info->num_preceding));
+
+ inline_field_names->push_back("num_following");
+ inline_field_values->push_back(std::to_string(frame_info->num_following));
+ }
+
+ if (is_distinct_) {
+ inline_field_names->push_back("is_distinct");
+ inline_field_values->push_back("true");
+ }
+}
+
+} // namespace expressions
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..0bee28f
--- /dev/null
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -0,0 +1,246 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregateFunction;
+class Type;
+
+namespace optimizer {
+namespace expressions {
+
+/** \addtogroup OptimizerExpressions
+ * @{
+ */
+
+struct WindowFrameInfo {
+ /**
+ * @brief Cosntructor.
+ *
+ * @param is_row_in True if this window frame is defined by ROWS, false if
+ * defined by RANGE.
+ * @param num_preceding_in The number of preceding tuples the window frame
+ * will cover, -1 means UNBOUNDED.
+ * @param num_following_in The number of following tuples the window frame
+ * will cover, -1 means UNBOUNDED.
+ **/
+ WindowFrameInfo(const bool is_row_in,
+ const int num_preceding_in,
+ const int num_following_in)
+ : is_row(is_row_in),
+ num_preceding(num_preceding_in),
+ num_following(num_following_in) {}
+
+ const bool is_row;
+ const int num_preceding;
+ const int num_following;
+};
+
+struct WindowInfo {
+ /**
+ * @brief Constructor.
+ *
+ * @param partition_by_attributes_in The partition keys for the window.
+ * @param order_by_attributes_in The order keys for the window.
+ * @param order_by_directions_in The order direction for order key.
+ * @param nulls_first_in The nulls' position for order key.
+ * @param frame_info_in The window frame information for the window. Null
+ * means there is no explicit window frame definition.
+ **/
+ WindowInfo(const std::vector<AttributeReferencePtr> &partition_by_attributes_in,
+ const std::vector<AttributeReferencePtr> &order_by_attributes_in,
+ const std::vector<bool> &order_by_directions_in,
+ const std::vector<bool> &nulls_first_in,
+ const WindowFrameInfo *frame_info_in)
+ : partition_by_attributes(partition_by_attributes_in),
+ order_by_attributes(order_by_attributes_in),
+ order_by_directions(order_by_directions_in),
+ nulls_first(nulls_first_in),
+ frame_info(frame_info_in) {}
+
+ const std::vector<AttributeReferencePtr> partition_by_attributes;
+ const std::vector<AttributeReferencePtr> order_by_attributes;
+ const std::vector<bool> order_by_directions;
+ const std::vector<bool> nulls_first;
+ const WindowFrameInfo *frame_info;
+};
+
+class WindowAggregateFunction;
+typedef std::shared_ptr<const WindowAggregateFunction> WindowAggregateFunctionPtr;
+
+/**
+ * @brief Represents a window aggregate function and its arguments in the
+ * optimizer. This class wraps some of the functionality from
+ * quickstep::AggregateFunction and represents a particular instance
+ * of an aggregate during query optimization.
+ **/
+class WindowAggregateFunction : public Expression {
+ public:
+ /**
+ * @brief Destructor.
+ */
+ ~WindowAggregateFunction() override {}
+
+ ExpressionType getExpressionType() const override {
+ return ExpressionType::kWindowAggregateFunction;
+ }
+
+ std::string getName() const override {
+ return "WindowAggregateFunction";
+ }
+
+ const Type& getValueType() const override;
+
+ bool isConstant() const override {
+ // Window aggregate function is never considered as a constant expression.
+ return false;
+ }
+
+ ExpressionPtr copyWithNewChildren(
+ const std::vector<ExpressionPtr> &new_children) const override;
+
+ std::vector<AttributeReferencePtr> getReferencedAttributes() const override;
+
+ /**
+ * @return Whether the type of the return value is nullable.
+ **/
+ bool isNullable() const;
+
+ /**
+ * @return The WindowAggregateFunction singleton (from the expression system)
+ * for this node.
+ **/
+ inline const ::quickstep::AggregateFunction& window_aggregate() const {
+ return window_aggregate_;
+ }
+
+ /**
+ * @return The list of scalar arguments to this aggregate.
+ **/
+ inline const std::vector<ScalarPtr>& arguments() const {
+ return arguments_;
+ }
+
+ /**
+ * @return The window info of this window aggregate function.
+ **/
+ inline const WindowInfo window_info() const {
+ return window_info_;
+ }
+
+ /**
+ * @return The name of the window.
+ **/
+ inline const std::string window_name() const {
+ return window_name_;
+ }
+
+ /**
+ * @return Whether this is a DISTINCT aggregation.
+ **/
+ inline bool is_distinct() const {
+ return is_distinct_;
+ }
+
+ /**
+ * @brief Create a new WindowAggregateFunction by directly defined window.
+ *
+ * @warning It is an error to call this with arguments that the given
+ * aggregate can not apply to.
+ *
+ * @param aggregate The underlying WindowAggregateFunction from the expression
+ * system.
+ * @param arguments A list of arguments to the window aggregate function.
+ * @param window_info The window info of the window aggregate function.
+ * @param is_distinct Whether this is a DISTINCT aggregation.
+ * @return A new AggregateFunctionPtr.
+ **/
+ static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+ const std::vector<ScalarPtr> &arguments,
+ const WindowInfo &window_info,
+ const std::string &window_name,
+ const bool is_distinct);
+
+ protected:
+ void getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+ /**
+ * @brief Constructor.
+ *
+ * @param window_aggregate The actual AggregateFunction to use.
+ * @param arguments A list of arguments to the window aggregate function.
+ * @param window_info The window info of the window aggregate function.
+ * @param is_distinct Indicates whether this is a DISTINCT aggregation.
+ */
+ WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+ const std::vector<ScalarPtr> &arguments,
+ const WindowInfo &window_info,
+ const std::string &window_name,
+ const bool is_distinct)
+ : window_aggregate_(window_aggregate),
+ arguments_(arguments),
+ window_info_(window_info),
+ window_name_(window_name),
+ is_distinct_(is_distinct) {
+ for (const ScalarPtr &child : arguments_) {
+ addChild(child);
+ }
+ }
+
+ // TODO(Shixuan): Currently this class uses AggregationFunction as
+ // window_aggregate_. If it really needs to be seperated from the
+ // AggregationFunction, a new class for WindowAggregationFunction should be
+ // created as quickstep::WindowAggregateFunction.
+ const ::quickstep::AggregateFunction &window_aggregate_;
+ std::vector<ScalarPtr> arguments_;
+ const WindowInfo window_info_;
+ const std::string window_name_;
+ const bool is_distinct_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+} // namespace expressions
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_WINDOW_AGGREGATE_FUNCTION_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CMakeLists.txt b/query_optimizer/logical/CMakeLists.txt
index 61c6234..b787c60 100644
--- a/query_optimizer/logical/CMakeLists.txt
+++ b/query_optimizer/logical/CMakeLists.txt
@@ -43,6 +43,7 @@ add_library(quickstep_queryoptimizer_logical_TableReference TableReference.cpp T
add_library(quickstep_queryoptimizer_logical_TableGenerator ../../empty_src.cpp TableGenerator.hpp)
add_library(quickstep_queryoptimizer_logical_TopLevelPlan TopLevelPlan.cpp TopLevelPlan.hpp)
add_library(quickstep_queryoptimizer_logical_UpdateTable UpdateTable.cpp UpdateTable.hpp)
+add_library(quickstep_queryoptimizer_logical_WindowAggregate WindowAggregate.cpp WindowAggregate.hpp)
# Link dependencies:
target_link_libraries(quickstep_queryoptimizer_logical_Aggregate
@@ -259,6 +260,20 @@ target_link_libraries(quickstep_queryoptimizer_logical_UpdateTable
quickstep_queryoptimizer_logical_LogicalType
quickstep_utility_Cast
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_logical_WindowAggregate
+ glog
+ quickstep_queryoptimizer_expressions_Alias
+ quickstep_queryoptimizer_expressions_AttributeReference
+ quickstep_queryoptimizer_expressions_Expression
+ quickstep_queryoptimizer_expressions_ExpressionUtil
+ quickstep_queryoptimizer_expressions_NamedExpression
+ quickstep_queryoptimizer_expressions_PatternMatcher
+ quickstep_queryoptimizer_logical_Logical
+ quickstep_queryoptimizer_logical_LogicalType
+ quickstep_queryoptimizer_OptimizerTree
+ quickstep_utility_Cast
+ quickstep_utility_Macros)
+
# Module all-in-one library:
add_library(quickstep_queryoptimizer_logical ../../empty_src.cpp OptimizerLogicalModule.hpp)
@@ -287,4 +302,5 @@ target_link_libraries(quickstep_queryoptimizer_logical
quickstep_queryoptimizer_logical_TableGenerator
quickstep_queryoptimizer_logical_TableReference
quickstep_queryoptimizer_logical_TopLevelPlan
- quickstep_queryoptimizer_logical_UpdateTable)
+ quickstep_queryoptimizer_logical_UpdateTable
+ quickstep_queryoptimizer_logical_WindowAggregate)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/LogicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/LogicalType.hpp b/query_optimizer/logical/LogicalType.hpp
index 1b9366e..c82fb47 100644
--- a/query_optimizer/logical/LogicalType.hpp
+++ b/query_optimizer/logical/LogicalType.hpp
@@ -49,7 +49,8 @@ enum class LogicalType {
kTableGenerator,
kTableReference,
kTopLevelPlan,
- kUpdateTable
+ kUpdateTable,
+ kWindowAggregate
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/PatternMatcher.hpp b/query_optimizer/logical/PatternMatcher.hpp
index ff8f3d0..de8609e 100644
--- a/query_optimizer/logical/PatternMatcher.hpp
+++ b/query_optimizer/logical/PatternMatcher.hpp
@@ -45,6 +45,7 @@ class Sort;
class TableReference;
class TopLevelPlan;
class UpdateTable;
+class WindowAggregate;
/** \addtogroup OptimizerLogical
* @{
@@ -130,6 +131,7 @@ using SomeSort = SomeLogicalNode<Sort, LogicalType::kSort>;
using SomeTableReference = SomeLogicalNode<TableReference, LogicalType::kTableReference>;
using SomeTopLevelPlan = SomeLogicalNode<TopLevelPlan, LogicalType::kTopLevelPlan>;
using SomeUpdateTable = SomeLogicalNode<UpdateTable, LogicalType::kUpdateTable>;
+using SomeWindowAggregate = SomeLogicalNode<WindowAggregate, LogicalType::kWindowAggregate>;
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/WindowAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/WindowAggregate.cpp b/query_optimizer/logical/WindowAggregate.cpp
new file mode 100644
index 0000000..0d747b6
--- /dev/null
+++ b/query_optimizer/logical/WindowAggregate.cpp
@@ -0,0 +1,85 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_optimizer/logical/WindowAggregate.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "utility/Cast.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+LogicalPtr WindowAggregate::copyWithNewChildren(
+ const std::vector<LogicalPtr> &new_children) const {
+ DCHECK_EQ(getNumChildren(), new_children.size());
+ return Create(new_children[0], window_aggregate_expression_);
+}
+
+std::vector<E::AttributeReferencePtr> WindowAggregate::getOutputAttributes() const {
+ std::vector<E::AttributeReferencePtr> output_attributes(input_->getOutputAttributes());
+ output_attributes.push_back(E::ToRef(window_aggregate_expression_));
+ return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> WindowAggregate::getReferencedAttributes() const {
+ return window_aggregate_expression_->getReferencedAttributes();
+}
+
+LogicalPtr WindowAggregate::copyWithNewInputExpressions(
+ const std::vector<E::ExpressionPtr> &input_expressions) const {
+ // Only one expression needed
+ DCHECK_EQ(1u, input_expressions.size());
+
+ E::AliasPtr window_aggregate_expression;
+ E::SomeAlias::MatchesWithConditionalCast(input_expressions[0],
+ &window_aggregate_expression);
+
+ return Create(input_, window_aggregate_expression);
+}
+
+void WindowAggregate::getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+ non_container_child_field_names->push_back("input");
+ non_container_child_fields->push_back(input_);
+
+ non_container_child_field_names->push_back("window_aggregate_expression");
+ non_container_child_fields->push_back(window_aggregate_expression_);
+}
+
+} // namespace logical
+} // namespace optimizer
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/WindowAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/WindowAggregate.hpp b/query_optimizer/logical/WindowAggregate.hpp
new file mode 100644
index 0000000..dcd9a7d
--- /dev/null
+++ b/query_optimizer/logical/WindowAggregate.hpp
@@ -0,0 +1,123 @@
+/**
+ * Copyright 2011-2015 Quickstep Technologies LLC.
+ * Copyright 2015 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_WINDOW_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_WINDOW_AGGREGATE_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/logical/LogicalType.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+/** \addtogroup OptimizerLogical
+ * @{
+ */
+
+class WindowAggregate;
+typedef std::shared_ptr<const WindowAggregate> WindowAggregatePtr;
+
+/**
+ * @brief Window aggregate operator that computes window aggregate expressions.
+ */
+class WindowAggregate : public Logical {
+ public:
+ LogicalType getLogicalType() const override {
+ return LogicalType::kWindowAggregate;
+ }
+
+ std::string getName() const override { return "WindowAggregate"; }
+
+ /**
+ * @return The input logical node.
+ */
+ const LogicalPtr& input() const { return input_; }
+
+ /**
+ * @return PARTITION BY expressions.
+ */
+ const expressions::AliasPtr window_aggregate_expression() const {
+ return window_aggregate_expression_;
+ }
+
+ LogicalPtr copyWithNewChildren(
+ const std::vector<LogicalPtr> &new_children) const override;
+
+ LogicalPtr copyWithNewInputExpressions(
+ const std::vector<expressions::ExpressionPtr> &input_expressions) const override;
+
+ std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+ std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+ /**
+ * @brief Creates an Aggregate logical node.
+ *
+ * @param input The input node.
+ * @param window_aggregate_expression The window aggregate expression.
+ * @return An immutable WindowAggregate node.
+ */
+ static WindowAggregatePtr Create(
+ const LogicalPtr &input,
+ const expressions::AliasPtr &window_aggregate_expression) {
+ return WindowAggregatePtr(new WindowAggregate(input, window_aggregate_expression));
+ }
+
+ protected:
+ void getFieldStringItems(
+ std::vector<std::string> *inline_field_names,
+ std::vector<std::string> *inline_field_values,
+ std::vector<std::string> *non_container_child_field_names,
+ std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+ std::vector<std::string> *container_child_field_names,
+ std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+ WindowAggregate(const LogicalPtr &input,
+ const expressions::AliasPtr &window_aggregate_expression)
+ : input_(input),
+ window_aggregate_expression_(window_aggregate_expression) {
+ addChild(input_);
+ addInputExpression(window_aggregate_expression_);
+ }
+
+ const LogicalPtr input_;
+ const expressions::AliasPtr window_aggregate_expression_;
+
+ DISALLOW_COPY_AND_ASSIGN(WindowAggregate);
+};
+
+/** @} */
+
+} // namespace logical
+} // namespace optimizer
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_WINDOW_AGGREGATE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index dc7eac0..fb75767 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -89,6 +89,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_queryoptimizer_expressions_SimpleCase
quickstep_queryoptimizer_expressions_SubqueryExpression
quickstep_queryoptimizer_expressions_UnaryExpression
+ quickstep_queryoptimizer_expressions_WindowAggregateFunction
quickstep_queryoptimizer_logical_Aggregate
quickstep_queryoptimizer_logical_CopyFrom
quickstep_queryoptimizer_logical_CreateIndex
@@ -109,6 +110,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
quickstep_queryoptimizer_logical_TableReference
quickstep_queryoptimizer_logical_TopLevelPlan
quickstep_queryoptimizer_logical_UpdateTable
+ quickstep_queryoptimizer_logical_WindowAggregate
quickstep_queryoptimizer_resolver_NameResolver
quickstep_storage_StorageBlockLayout_proto
quickstep_storage_StorageConstants
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index ffc173a..f880ce7 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -85,6 +85,7 @@
#include "query_optimizer/expressions/SimpleCase.hpp"
#include "query_optimizer/expressions/SubqueryExpression.hpp"
#include "query_optimizer/expressions/UnaryExpression.hpp"
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
#include "query_optimizer/logical/Aggregate.hpp"
#include "query_optimizer/logical/CopyFrom.hpp"
#include "query_optimizer/logical/CreateIndex.hpp"
@@ -104,6 +105,7 @@
#include "query_optimizer/logical/TableReference.hpp"
#include "query_optimizer/logical/TopLevelPlan.hpp"
#include "query_optimizer/logical/UpdateTable.hpp"
+#include "query_optimizer/logical/WindowAggregate.hpp"
#include "query_optimizer/resolver/NameResolver.hpp"
#include "storage/StorageBlockLayout.pb.h"
#include "storage/StorageConstants.hpp"
@@ -164,9 +166,11 @@ struct Resolver::ExpressionResolutionInfo {
*/
ExpressionResolutionInfo(const NameResolver &name_resolver_in,
QueryAggregationInfo *query_aggregation_info_in,
+ WindowAggregationInfo *window_aggregation_info_in,
SelectListInfo *select_list_info_in)
: name_resolver(name_resolver_in),
query_aggregation_info(query_aggregation_info_in),
+ window_aggregation_info(window_aggregation_info_in),
select_list_info(select_list_info_in) {}
/**
@@ -180,6 +184,7 @@ struct Resolver::ExpressionResolutionInfo {
: name_resolver(parent.name_resolver),
not_allow_aggregate_here(parent.not_allow_aggregate_here),
query_aggregation_info(parent.query_aggregation_info),
+ window_aggregation_info(parent.window_aggregation_info),
select_list_info(parent.select_list_info) {}
/**
@@ -187,16 +192,29 @@ struct Resolver::ExpressionResolutionInfo {
*/
bool hasAggregate() const { return parse_aggregate_expression != nullptr; }
+ /**
+ * @return True if the expression contains a window aggregate function.
+ **/
+ bool hasWindowAggregate() const {
+ return parse_window_aggregate_expression != nullptr;
+ }
+
const NameResolver &name_resolver;
// Empty if aggregations are allowed.
const std::string not_allow_aggregate_here;
// Can be NULL if aggregations are not allowed.
QueryAggregationInfo *query_aggregation_info = nullptr;
+
+ // Alias expressions that wraps window aggregate functions.
+ WindowAggregationInfo *window_aggregation_info = nullptr;
+
// Can be NULL if alias references to SELECT-list expressions are not allowed.
SelectListInfo *select_list_info = nullptr;
// The first aggregate in the expression.
const ParseTreeNode *parse_aggregate_expression = nullptr;
+ // The first window aggregate in the expression.
+ const ParseTreeNode *parse_window_aggregate_expression = nullptr;
};
struct Resolver::QueryAggregationInfo {
@@ -209,6 +227,26 @@ struct Resolver::QueryAggregationInfo {
std::vector<E::AliasPtr> aggregate_expressions;
};
+struct Resolver::WindowPlan {
+ WindowPlan(const E::WindowInfo &window_info_in,
+ const L::LogicalPtr &logical_plan_in)
+ : window_info(window_info_in),
+ logical_plan(logical_plan_in) {}
+
+ const E::WindowInfo window_info;
+ const L::LogicalPtr logical_plan;
+};
+
+struct Resolver::WindowAggregationInfo {
+ explicit WindowAggregationInfo(const std::unordered_map<std::string, WindowPlan> &window_map_in)
+ : window_map(window_map_in) {}
+
+ // Whether the current query block has a GROUP BY.
+ const std::unordered_map<std::string, WindowPlan> window_map;
+ // Alias expressions that wraps window aggregate functions.
+ std::vector<E::AliasPtr> window_aggregate_expressions;
+};
+
struct Resolver::SelectListInfo {
public:
/**
@@ -973,8 +1011,36 @@ L::LogicalPtr Resolver::resolveSelect(
logical_plan, resolvePredicate(parse_predicate, &expr_resolution_info));
}
+ // Resolve WINDOW clause.
+ std::unordered_map<std::string, WindowPlan> sorted_window_map;
+ if (select_query.window_list() != nullptr) {
+ if (select_query.window_list()->size() > 1) {
+ THROW_SQL_ERROR_AT(&(*select_query.window_list()->begin()))
+ << "Currently we don't support multiple window aggregation functions";
+ }
+
+ // Sort the table according to the window.
+ for (const ParseWindow &window : *select_query.window_list()) {
+ // Check for duplicate definition.
+ // Currently this is useless since we only support one window.
+ if (sorted_window_map.find(window.name()->value()) != sorted_window_map.end()) {
+ THROW_SQL_ERROR_AT(window.name())
+ << "Duplicate definition of window " << window.name()->value();
+ }
+
+ E::WindowInfo resolved_window = resolveWindow(window, *name_resolver);
+ L::LogicalPtr sorted_logical_plan = resolveSortInWindow(logical_plan,
+ resolved_window);
+
+ WindowPlan window_plan(resolved_window, sorted_logical_plan);
+
+ sorted_window_map.emplace(window.name()->value(), window_plan);
+ }
+ }
+
QueryAggregationInfo query_aggregation_info(
(select_query.group_by() != nullptr));
+ WindowAggregationInfo window_aggregation_info(sorted_window_map);
// Resolve SELECT-list clause.
std::vector<E::NamedExpressionPtr> select_list_expressions;
@@ -984,6 +1050,7 @@ L::LogicalPtr Resolver::resolveSelect(
type_hints,
*name_resolver,
&query_aggregation_info,
+ &window_aggregation_info,
&select_list_expressions,
&has_aggregate_per_expression);
DCHECK_EQ(has_aggregate_per_expression.size(),
@@ -992,6 +1059,29 @@ L::LogicalPtr Resolver::resolveSelect(
SelectListInfo select_list_info(select_list_expressions,
has_aggregate_per_expression);
+ // Create window aggregate node if needed
+ for (const E::AliasPtr &alias : window_aggregation_info.window_aggregate_expressions) {
+ E::WindowAggregateFunctionPtr window_aggregate_function;
+ if (!E::SomeWindowAggregateFunction::MatchesWithConditionalCast(alias->expression(),
+ &window_aggregate_function)) {
+ THROW_SQL_ERROR()
+ << "Unexpected expression in window aggregation function";
+ }
+ L::LogicalPtr sorted_logical_plan;
+
+ // Get the sorted logical plan
+ const std::string window_name = window_aggregate_function->window_name();
+ if (!window_name.empty()) {
+ sorted_logical_plan = sorted_window_map.at(window_name).logical_plan;
+ } else {
+ sorted_logical_plan = resolveSortInWindow(logical_plan,
+ window_aggregate_function->window_info());
+ }
+
+ logical_plan = L::WindowAggregate::Create(sorted_logical_plan,
+ alias);
+ }
+
// Resolve GROUP BY.
std::vector<E::NamedExpressionPtr> group_by_expressions;
if (select_query.group_by() != nullptr) {
@@ -1039,7 +1129,7 @@ L::LogicalPtr Resolver::resolveSelect(
E::PredicatePtr having_predicate;
if (select_query.having() != nullptr) {
ExpressionResolutionInfo expr_resolution_info(
- *name_resolver, &query_aggregation_info, &select_list_info);
+ *name_resolver, &query_aggregation_info, &window_aggregation_info, &select_list_info);
having_predicate = resolvePredicate(
*select_query.having()->having_predicate(), &expr_resolution_info);
}
@@ -1053,7 +1143,7 @@ L::LogicalPtr Resolver::resolveSelect(
for (const ParseOrderByItem &order_by_item :
*select_query.order_by()->order_by_items()) {
ExpressionResolutionInfo expr_resolution_info(
- *name_resolver, &query_aggregation_info, &select_list_info);
+ *name_resolver, &query_aggregation_info, &window_aggregation_info, &select_list_info);
E::ScalarPtr order_by_scalar = resolveExpression(
*order_by_item.ordering_expression(),
nullptr, // No Type hint.
@@ -1528,6 +1618,89 @@ L::LogicalPtr Resolver::RenameOutputColumns(
return L::Project::Create(logical_plan, project_expressions);
}
+E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
+ const NameResolver &name_resolver) {
+ std::vector<E::AttributeReferencePtr> partition_by_attributes;
+ std::vector<E::AttributeReferencePtr> order_by_attributes;
+ std::vector<bool> order_by_directions;
+ std::vector<bool> nulls_first;
+ E::WindowFrameInfo *frame_info = nullptr;
+
+ // Resolve PARTITION BY
+ if (parse_window.partition_by_expressions() != nullptr) {
+ for (const ParseExpression &unresolved_partition_by_expression :
+ *parse_window.partition_by_expressions()) {
+ ExpressionResolutionInfo expr_resolution_info(
+ name_resolver,
+ "PARTITION BY clause" /* clause_name */,
+ nullptr /* select_list_info */);
+ E::ScalarPtr partition_by_scalar = resolveExpression(
+ unresolved_partition_by_expression,
+ nullptr, // No Type hint.
+ &expr_resolution_info);
+
+ if (partition_by_scalar->isConstant()) {
+ THROW_SQL_ERROR_AT(&unresolved_partition_by_expression)
+ << "Constant expression not allowed in PARTITION BY";
+ }
+
+ E::AttributeReferencePtr partition_by_attribute;
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(partition_by_scalar,
+ &partition_by_attribute)) {
+ THROW_SQL_ERROR_AT(&unresolved_partition_by_expression)
+ << "Only attribute name allowed in PARTITION BY in window definition";
+ }
+
+ partition_by_attributes.push_back(partition_by_attribute);
+ }
+ }
+
+ // Resolve ORDER BY
+ if (parse_window.order_by_expressions() != nullptr) {
+ for (const ParseOrderByItem &order_by_item :
+ *parse_window.order_by_expressions()) {
+ ExpressionResolutionInfo expr_resolution_info(
+ name_resolver,
+ "ORDER BY clause" /* clause name */,
+ nullptr /* select_list_info */);
+ E::ScalarPtr order_by_scalar = resolveExpression(
+ *order_by_item.ordering_expression(),
+ nullptr, // No Type hint.
+ &expr_resolution_info);
+
+ if (order_by_scalar->isConstant()) {
+ THROW_SQL_ERROR_AT(&order_by_item)
+ << "Constant expression not allowed in ORDER BY";
+ }
+
+ E::AttributeReferencePtr order_by_attribute;
+ if (!E::SomeAttributeReference::MatchesWithConditionalCast(order_by_scalar,
+ &order_by_attribute)) {
+ THROW_SQL_ERROR_AT(&order_by_item)
+ << "Only attribute name allowed in ORDER BY in window definition";
+ }
+
+ order_by_attributes.push_back(order_by_attribute);
+ order_by_directions.push_back(order_by_item.is_ascending());
+ nulls_first.push_back(order_by_item.nulls_first());
+ }
+ }
+
+ // Resolve window frame
+ if (parse_window.frame_info() != nullptr) {
+ const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info();
+ frame_info = new E::WindowFrameInfo(parse_frame_info->is_row,
+ parse_frame_info->num_preceding,
+ parse_frame_info->num_following);
+ }
+
+ return E::WindowInfo(partition_by_attributes,
+ order_by_attributes,
+ order_by_directions,
+ nulls_first,
+ frame_info);
+}
+
const CatalogRelation* Resolver::resolveRelationName(
const ParseString *relation_name) {
const CatalogRelation *relation =
@@ -1684,13 +1857,45 @@ L::LogicalPtr Resolver::resolveJoinedTableReference(
THROW_SQL_ERROR_AT(&joined_table_reference) << "Full outer join is not supported yet";
}
+L::LogicalPtr Resolver::resolveSortInWindow(
+ const L::LogicalPtr &logical_plan,
+ const E::WindowInfo &window_info) {
+ // Sort the table by (p_key, o_key)
+ std::vector<E::AttributeReferencePtr> sort_attributes(window_info.partition_by_attributes);
+ sort_attributes.insert(sort_attributes.end(),
+ window_info.order_by_attributes.begin(),
+ window_info.order_by_attributes.end());
+
+ std::vector<bool> sort_directions(
+ window_info.partition_by_attributes.size(), true);
+ sort_directions.insert(sort_directions.end(),
+ window_info.order_by_directions.begin(),
+ window_info.order_by_directions.end());
+
+ std::vector<bool> sort_nulls_first(
+ window_info.partition_by_attributes.size(), false);
+ sort_nulls_first.insert(sort_nulls_first.end(),
+ window_info.nulls_first.begin(),
+ window_info.nulls_first.end());
+
+ L::LogicalPtr sorted_logical_plan =
+ L::Sort::Create(logical_plan,
+ sort_attributes,
+ sort_directions,
+ sort_nulls_first,
+ -1 /* limit */);
+
+ return sorted_logical_plan;
+}
+
void Resolver::resolveSelectClause(
const ParseSelectionClause &parse_selection,
const std::string &select_name,
const std::vector<const Type*> *type_hints,
const NameResolver &name_resolver,
QueryAggregationInfo *query_aggregation_info,
- std::vector<expressions::NamedExpressionPtr> *project_expressions,
+ WindowAggregationInfo *window_aggregation_info,
+ std::vector<E::NamedExpressionPtr> *project_expressions,
std::vector<bool> *has_aggregate_per_expression) {
project_expressions->clear();
switch (parse_selection.getSelectionType()) {
@@ -1720,6 +1925,7 @@ void Resolver::resolveSelectClause(
ExpressionResolutionInfo expr_resolution_info(
name_resolver,
query_aggregation_info,
+ window_aggregation_info,
nullptr /* select_list_info */);
const E::ScalarPtr project_scalar =
resolveExpression(*parse_project_expression,
@@ -2362,16 +2568,12 @@ E::ScalarPtr Resolver::resolveSimpleCaseExpression(
// TODO(chasseur): For now this only handles resolving aggregate functions. In
// the future it should be extended to resolve scalar functions as well.
+// TODO(Shixuan): This will handle resolving window aggregation function as well,
+// which could be extended to general scalar functions.
E::ScalarPtr Resolver::resolveFunctionCall(
const ParseFunctionCall &parse_function_call,
ExpressionResolutionInfo *expression_resolution_info) {
- std::string function_name = ToLower(parse_function_call.name()->value());
-
- // TODO(Shixuan): Add support for window aggregation function.
- if (parse_function_call.isWindow()) {
- THROW_SQL_ERROR_AT(&parse_function_call)
- << "Window Aggregation Function is not supported currently";
- }
+ const std::string function_name = ToLower(parse_function_call.name()->value());
// First check for the special case COUNT(*).
bool count_star = false;
@@ -2386,8 +2588,9 @@ E::ScalarPtr Resolver::resolveFunctionCall(
std::vector<E::ScalarPtr> resolved_arguments;
const PtrList<ParseExpression> *unresolved_arguments =
parse_function_call.arguments();
- // The first aggregate function in the arguments.
+ // The first aggregate function and window aggregate function in the arguments.
const ParseTreeNode *first_aggregate_function = nullptr;
+ const ParseTreeNode *first_window_aggregate_function = nullptr;
if (unresolved_arguments != nullptr) {
for (const ParseExpression &unresolved_argument : *unresolved_arguments) {
ExpressionResolutionInfo expr_resolution_info(
@@ -2401,6 +2604,13 @@ E::ScalarPtr Resolver::resolveFunctionCall(
first_aggregate_function =
expr_resolution_info.parse_aggregate_expression;
}
+
+ if (expr_resolution_info.hasWindowAggregate() &&
+ first_window_aggregate_function == nullptr &&
+ parse_function_call.isWindow()) {
+ first_window_aggregate_function =
+ expr_resolution_info.parse_window_aggregate_expression;
+ }
}
}
@@ -2431,6 +2641,15 @@ E::ScalarPtr Resolver::resolveFunctionCall(
<< "Aggregation of Aggregates are not allowed";
}
+ // TODO(Shixuan): We currently don't support nested window aggregation since
+ // TPC-DS doesn't have that. However, it is essentially a nested scalar
+ // function, which should be supported in the future version of Quickstep.
+ if (parse_function_call.isWindow() &&
+ first_window_aggregate_function != nullptr) {
+ THROW_SQL_ERROR_AT(first_window_aggregate_function)
+ << "Window aggregation of window aggregates is not allowed";
+ }
+
// Make sure a naked COUNT() with no arguments wasn't specified.
if ((aggregate->getAggregationID() == AggregationID::kCount)
&& (resolved_arguments.empty())
@@ -2452,6 +2671,13 @@ E::ScalarPtr Resolver::resolveFunctionCall(
<< " can not apply to the given argument(s).";
}
+ if (parse_function_call.isWindow()) {
+ return resolveWindowAggregateFunction(parse_function_call,
+ expression_resolution_info,
+ aggregate,
+ resolved_arguments);
+ }
+
// Create the optimizer representation of the resolved aggregate and an alias
// for it to appear in the output relation.
const E::AggregateFunctionPtr aggregate_function
@@ -2471,6 +2697,62 @@ E::ScalarPtr Resolver::resolveFunctionCall(
return E::ToRef(aggregate_alias);
}
+E::ScalarPtr Resolver::resolveWindowAggregateFunction(
+ const ParseFunctionCall &parse_function_call,
+ ExpressionResolutionInfo *expression_resolution_info,
+ const ::quickstep::AggregateFunction *window_aggregate,
+ const std::vector<E::ScalarPtr> &resolved_arguments) {
+ // A window aggregate function might be defined OVER a window name or a window.
+ E::WindowAggregateFunctionPtr window_aggregate_function;
+ if (parse_function_call.window_name() != nullptr) {
+ std::unordered_map<std::string, WindowPlan> window_map
+ = expression_resolution_info->window_aggregation_info->window_map;
+ std::string window_name = parse_function_call.window_name()->value();
+ std::unordered_map<std::string, WindowPlan>::const_iterator map_it
+ = window_map.find(window_name);
+
+ if (map_it == window_map.end()) {
+ THROW_SQL_ERROR_AT(parse_function_call.window_name())
+ << "Undefined window " << window_name;
+ }
+
+ window_aggregate_function =
+ E::WindowAggregateFunction::Create(*window_aggregate,
+ resolved_arguments,
+ map_it->second.window_info,
+ parse_function_call.window_name()->value(),
+ parse_function_call.is_distinct());
+ } else {
+ E::WindowInfo resolved_window = resolveWindow(*parse_function_call.window(),
+ expression_resolution_info->name_resolver);
+
+ window_aggregate_function =
+ E::WindowAggregateFunction::Create(*window_aggregate,
+ resolved_arguments,
+ resolved_window,
+ "" /* window name */,
+ parse_function_call.is_distinct());
+ }
+
+ const std::string internal_alias = GenerateWindowAggregateAttributeAlias(
+ expression_resolution_info->query_aggregation_info->aggregate_expressions.size());
+ const E::AliasPtr aggregate_alias = E::Alias::Create(context_->nextExprId(),
+ window_aggregate_function,
+ "" /* attribute_name */,
+ internal_alias,
+ "$window_aggregate" /* relation_name */);
+
+ if (!expression_resolution_info->window_aggregation_info->window_aggregate_expressions.empty()) {
+ THROW_SQL_ERROR_AT(&parse_function_call)
+ << "Currently we only support single window aggregate in the query";
+ }
+
+ expression_resolution_info->window_aggregation_info
+ ->window_aggregate_expressions.emplace_back(aggregate_alias);
+ expression_resolution_info->parse_window_aggregate_expression = &parse_function_call;
+ return E::ToRef(aggregate_alias);
+}
+
std::vector<E::PredicatePtr> Resolver::resolvePredicates(
const PtrList<ParsePredicate> &parse_predicates,
ExpressionResolutionInfo *expression_resolution_info) {
@@ -2794,16 +3076,20 @@ void Resolver::rewriteIfOrdinalReference(
}
}
+std::string Resolver::GenerateWindowAggregateAttributeAlias(int index) {
+ return "$window_aggregate" + std::to_string(index);
+}
+
std::string Resolver::GenerateAggregateAttributeAlias(int index) {
- return std::string("$aggregate").append(std::to_string(index));
+ return "$aggregate" + std::to_string(index);
}
std::string Resolver::GenerateGroupingAttributeAlias(int index) {
- return std::string("$groupby").append(std::to_string(index));
+ return "$groupby" + std::to_string(index);
}
std::string Resolver::GenerateOrderingAttributeAlias(int index) {
- return std::string("$orderby").append(std::to_string(index));
+ return "$orderby" + std::to_string(index);
}
} // namespace resolver
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index a84c61c..f4024e9 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -24,11 +24,13 @@
#include <vector>
#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
#include "query_optimizer/expressions/ExprId.hpp"
#include "query_optimizer/expressions/NamedExpression.hpp"
#include "query_optimizer/expressions/Predicate.hpp"
#include "query_optimizer/expressions/SubqueryExpression.hpp"
#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
#include "query_optimizer/logical/Logical.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
@@ -65,6 +67,7 @@ class ParseSubqueryExpression;
class ParseTableReference;
class ParseTableReferenceSignature;
class ParseTreeNode;
+class ParseWindow;
template <class T>
class PtrList;
class StorageBlockLayoutDescription;
@@ -123,6 +126,17 @@ class Resolver {
struct QueryAggregationInfo;
/**
+ * @brief Query-scoped info that contains window aggregate expressions and a
+ * window map.
+ **/
+ struct WindowAggregationInfo;
+
+ /**
+ * @brief A wrapper for resolved window and the corresponding sorted plan.
+ **/
+ struct WindowPlan;
+
+ /**
* @brief Query-scoped info that contains select-list expressions
* and whether an expression is referenced by an
* ordinal or alias reference.
@@ -271,6 +285,8 @@ class Resolver {
* @param name_resolver NameResolver to resolve the relation/attribute names.
* @param query_aggregation_info Passed down to each expression to collects
* aggregate expressions.
+ * @param window_aggregate_expressions Passed down to each expressions to
+ * collects window aggregate expressions.
* @param project_expressions Converted SELECT-list expressions.
* @param has_aggregate_per_expression For each SELECT-list expression,
* indicates whether it contains
@@ -282,6 +298,7 @@ class Resolver {
const std::vector<const Type*> *type_hints,
const NameResolver &name_resolver,
QueryAggregationInfo *query_aggregation_info,
+ WindowAggregationInfo *window_aggregation_info,
std::vector<expressions::NamedExpressionPtr> *project_expressions,
std::vector<bool> *has_aggregate_per_expression);
@@ -359,6 +376,17 @@ class Resolver {
const ParseTableReferenceSignature &table_signature);
/**
+ * @brief Sort the input table in (p_key, o_key) order specified by the window.
+ *
+ * @param logical_plan The input logical node.
+ * @param window_info The window that the input table has to be sorted accordingly.
+ * @return A logical plan that sorts the table according to window_info.
+ **/
+ logical::LogicalPtr resolveSortInWindow(
+ const logical::LogicalPtr &logical_plan,
+ const expressions::WindowInfo &window_info);
+
+ /**
* @brief Resolves a parse expression and converts it to a scalar expression
* in the query optimizer. A non-scalar parse expression is resolved
* to an AttributeReference to another optimizer expression.
@@ -412,7 +440,8 @@ class Resolver {
* @brief Resolves a function call. For a non-scalar function, the returned
* expression is an AttributeReference to the actual resolved expression.
*
- * @note This currently only handles resolving aggregate functions.
+ * @note This currently only handles resolving aggregate functions and window
+ * aggregate functions.
*
* @param parse_function_call The function call to be resolved.
* @param expression_resolution_info Resolution info that contains the name
@@ -425,6 +454,23 @@ class Resolver {
ExpressionResolutionInfo *expression_resolution_info);
/**
+ * @brief Resolves a window aggregate function.
+ *
+ * @param parse_function_call The function call to be resolved.
+ * @param expression_resolution_info Resolution info that contains the name
+ * resolver and info to be updated after
+ * resolution.
+ * @param aggregate The aggregate function.
+ * @param resolved_arguments The resolved arguments.
+ * @return An expression in the query optimizer.
+ */
+ expressions::ScalarPtr resolveWindowAggregateFunction(
+ const ParseFunctionCall &parse_function_call,
+ ExpressionResolutionInfo *expression_resolution_info,
+ const ::quickstep::AggregateFunction *aggregate,
+ const std::vector<expressions::ScalarPtr> &resolved_arguments);
+
+ /**
* @brief Resolves a parse Predicate and converts it to a predicate in the
* query optimizer.
*
@@ -469,6 +515,15 @@ class Resolver {
const bool has_single_column);
/**
+ * @brief Resolves a window definition.
+ *
+ * @param parse_window The parsed window definition.
+ * @param name_resolver The resolver to resolve names.
+ **/
+ expressions::WindowInfo resolveWindow(const ParseWindow &parse_window,
+ const NameResolver &name_resolver);
+
+ /**
* @brief Resolves a relation name to a pointer to the corresponding
* CatalogRelation with the name.
*
@@ -501,6 +556,15 @@ class Resolver {
static std::string GenerateAggregateAttributeAlias(int index);
/**
+ * @brief Generates an internal alias for a window aggregate attribute.
+ *
+ * @param index The index of the window aggregate attribute used for
+ * generating the name.
+ * @return A string for the name.
+ */
+ static std::string GenerateWindowAggregateAttributeAlias(int index);
+
+ /**
* @brief Generates an internal alias for a grouping attribute.
*
* @param index The index of the grouping attribute used for generating the
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/strategy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/CMakeLists.txt b/query_optimizer/strategy/CMakeLists.txt
index 74f5a4b..84e151e 100644
--- a/query_optimizer/strategy/CMakeLists.txt
+++ b/query_optimizer/strategy/CMakeLists.txt
@@ -105,7 +105,8 @@ target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne
quickstep_queryoptimizer_physical_TopLevelPlan
quickstep_queryoptimizer_physical_UpdateTable
quickstep_queryoptimizer_strategy_Strategy
- quickstep_utility_Macros)
+ quickstep_utility_Macros
+ quickstep_utility_SqlError)
target_link_libraries(quickstep_queryoptimizer_strategy_Selection
glog
quickstep_queryoptimizer_LogicalToPhysicalMapper
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/strategy/OneToOne.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/OneToOne.cpp b/query_optimizer/strategy/OneToOne.cpp
index 7f59151..f49a25c 100644
--- a/query_optimizer/strategy/OneToOne.cpp
+++ b/query_optimizer/strategy/OneToOne.cpp
@@ -55,6 +55,7 @@
#include "query_optimizer/physical/TableReference.hpp"
#include "query_optimizer/physical/TopLevelPlan.hpp"
#include "query_optimizer/physical/UpdateTable.hpp"
+#include "utility/SqlError.hpp"
namespace quickstep {
namespace optimizer {
@@ -208,6 +209,10 @@ bool OneToOne::generatePlan(const L::LogicalPtr &logical_input,
update_table->predicate());
return true;
}
+ case L::LogicalType::kWindowAggregate: {
+ THROW_SQL_ERROR()
+ << "Window aggregate function is not supported currently :(";
+ }
default:
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/tests/logical_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/logical_generator/Select.test b/query_optimizer/tests/logical_generator/Select.test
index 3c152e8..e0003bf 100644
--- a/query_optimizer/tests/logical_generator/Select.test
+++ b/query_optimizer/tests/logical_generator/Select.test
@@ -1354,3 +1354,165 @@ TopLevelPlan
+-output_attributes=
+-AttributeReference[id=5,name=x,relation=,type=Int]
+-AttributeReference[id=6,name=y,relation=,type=Int]
+==
+
+# Window Aggregate Function Test.
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,false],nulls_first=[false,false]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| | +-WindowAggregateFunction[function=AVG,window_name=w,is_ascending=[false],
+| | nulls_first=[false],frame_mode=row,num_preceding=-1,num_following=0]
+| | +-arguments=
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-partition_by=
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-order_by=
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| +-project_list=
+| +-Alias[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+| +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+FROM test;
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,true,false,true],
+| | | nulls_first=[false,false,false,true]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| | +-WindowAggregateFunction[function=SUM,window_name=,
+| | is_ascending=[false,true],nulls_first=[false,true],frame_mode=range,
+| | num_preceding=3,num_following=3]
+| | +-arguments=
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-partition_by=
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-order_by=
+| | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | type=Double NULL]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| +-project_list=
+| +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| +-Alias[id=6,name=,alias=sum(float_col),relation=,type=Double NULL]
+| +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+ +-AttributeReference[id=6,name=,alias=sum(float_col),relation=,
+ type=Double NULL]
+==
+
+SELECT sum(avg(int_col) OVER w) FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=WindowAggregate
+| | | +-input=Sort[is_ascending=[true,true],nulls_first=[false,false]]
+| | | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | | | type=Double NULL]
+| | | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | | type=VarChar(20) NULL]
+| | | | +-sort_expressions=
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | | relation=$window_aggregate,type=Double NULL]
+| | | +-WindowAggregateFunction[function=AVG,window_name=w,
+| | | is_ascending=[true],nulls_first=[false],frame_mode=row,
+| | | num_preceding=-1,num_following=0]
+| | | +-arguments=
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-partition_by=
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-order_by=
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| | +-Alias[id=7,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| | +-AggregateFunction[function=SUM]
+| | +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| +-project_list=
+| +-Alias[id=7,name=,alias=sum(avg(int_col)),relation=,type=Double NULL]
+| +-AttributeReference[id=7,name=,alias=$aggregate0,relation=$aggregate,
+| type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=7,name=,alias=sum(avg(int_col)),relation=,
+ type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER w1 FROM test
+WINDOW w2 AS
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING);
+--
+ERROR: Undefined window w1 (1 : 37)
+SELECT int_col, sum(float_col) OVER w1 FROM test
+ ^
+==
+
+SELECT sum(avg(int_col)) OVER w FROM test
+WINDOW w AS
+(PARTITION BY double_col
+ ORDER BY char_col)
+--
+ERROR: Aggregation of Aggregates are not allowed (1 : 12)
+SELECT sum(avg(int_col)) OVER w FROM test
+ ^
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/tests/resolver/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Select.test b/query_optimizer/tests/resolver/Select.test
index 141bfa0..89ab84d 100644
--- a/query_optimizer/tests/resolver/Select.test
+++ b/query_optimizer/tests/resolver/Select.test
@@ -3126,3 +3126,165 @@ FROM test;
ERROR: The substring length must be greater than 0 (1 : 8)
SELECT SUBSTRING(char_col FROM 1 FOR ...
^
+==
+
+# Window Aggregate Function Test.
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,false],nulls_first=[false,false]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| | +-WindowAggregateFunction[function=AVG,window_name=w,is_ascending=[false],
+| | nulls_first=[false],frame_mode=row,num_preceding=-1,num_following=0]
+| | +-arguments=
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-partition_by=
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-order_by=
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| +-project_list=
+| +-Alias[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+| +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+FROM test;
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,true,false,true],
+| | | nulls_first=[false,false,false,true]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| | +-WindowAggregateFunction[function=SUM,window_name=,
+| | is_ascending=[false,true],nulls_first=[false,true],frame_mode=range,
+| | num_preceding=3,num_following=3]
+| | +-arguments=
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-partition_by=
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | type=VarChar(20) NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-order_by=
+| | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | type=Double NULL]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| +-project_list=
+| +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| +-Alias[id=6,name=,alias=sum(float_col),relation=,type=Double NULL]
+| +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+ +-AttributeReference[id=6,name=,alias=sum(float_col),relation=,
+ type=Double NULL]
+==
+
+SELECT sum(avg(int_col) OVER w) FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=WindowAggregate
+| | | +-input=Sort[is_ascending=[true,true],nulls_first=[false,false]]
+| | | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | | | type=Double NULL]
+| | | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | | type=VarChar(20) NULL]
+| | | | +-sort_expressions=
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | | relation=$window_aggregate,type=Double NULL]
+| | | +-WindowAggregateFunction[function=AVG,window_name=w,
+| | | is_ascending=[true],nulls_first=[false],frame_mode=row,
+| | | num_preceding=-1,num_following=0]
+| | | +-arguments=
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-partition_by=
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-order_by=
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| | +-Alias[id=7,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| | +-AggregateFunction[function=SUM]
+| | +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| | relation=$window_aggregate,type=Double NULL]
+| +-project_list=
+| +-Alias[id=7,name=,alias=sum(avg(int_col)),relation=,type=Double NULL]
+| +-AttributeReference[id=7,name=,alias=$aggregate0,relation=$aggregate,
+| type=Double NULL]
++-output_attributes=
+ +-AttributeReference[id=7,name=,alias=sum(avg(int_col)),relation=,
+ type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER w1 FROM test
+WINDOW w2 AS
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING);
+--
+ERROR: Undefined window w1 (1 : 37)
+SELECT int_col, sum(float_col) OVER w1 FROM test
+ ^
+==
+
+SELECT sum(avg(int_col)) OVER w FROM test
+WINDOW w AS
+(PARTITION BY double_col
+ ORDER BY char_col)
+--
+ERROR: Aggregation of Aggregates are not allowed (1 : 12)
+SELECT sum(avg(int_col)) OVER w FROM test
+ ^
[09/11] incubator-quickstep git commit: Bug fix in initialization of
probabilities.
Posted by hb...@apache.org.
Bug fix in initialization of probabilities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/35bf47e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/35bf47e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/35bf47e9
Branch: refs/heads/scheduler++
Commit: 35bf47e9db81cb6a598ea8cbb9a06493a7fe3214
Parents: 6183e39
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:57:01 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 7 +++--
query_execution/tests/Learner_unittest.cpp | 38 +++++++++++++++++++++++++
2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/35bf47e9/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 5d877b4..38a773b 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -184,9 +184,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
for (auto priority_iter = execution_stats_.cbegin();
priority_iter != execution_stats_.cend();
++priority_iter) {
- sum_priority_levels += priority_iter->second.size();
- priority_levels.emplace_back(priority_iter->first);
- numerators.emplace_back(priority_iter->first);
+ const std::size_t curr_priority_level = priority_iter->first;
+ sum_priority_levels += curr_priority_level;
+ priority_levels.emplace_back(curr_priority_level);
+ numerators.emplace_back(curr_priority_level);
}
if (sum_priority_levels > 0) {
probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/35bf47e9/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 74353f0..864bb22 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -84,4 +84,42 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
}
+TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel1 = 1;
+ const std::size_t kPriorityLevel2 = 2;
+ handle1.reset(new QueryHandle(1, kPriorityLevel1));
+ handle2.reset(new QueryHandle(2, kPriorityLevel2));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ learner.addQuery(*handle1);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ learner.addQuery(*handle2);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ learner.removeQuery(handle2->query_id());
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+
+ learner.removeQuery(handle1->query_id());
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+}
+
} // namespace quickstep
[08/11] incubator-quickstep git commit: Added test for adding
completion message feedback.
Posted by hb...@apache.org.
Added test for adding completion message feedback.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cb519ebd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cb519ebd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cb519ebd
Branch: refs/heads/scheduler++
Commit: cb519ebd639ecdab512cb17e13740c06e5c19a4d
Parents: 9ad3281
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 12:09:22 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 1 +
query_execution/Learner.cpp | 4 +-
query_execution/tests/Learner_unittest.cpp | 82 ++++++++++++++++++++++++-
3 files changed, 82 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cb519ebd/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 3904185..ef1ce99 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -274,6 +274,7 @@ target_link_libraries(Learner_unittest
gtest
gtest_main
quickstep_queryexecution_Learner
+ quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryoptimizer_QueryHandle)
add_test(Learner_unittest Learner_unittest)
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cb519ebd/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 0f17e7a..c7a7064 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -51,10 +51,11 @@ void Learner::addCompletionFeedback(
workorder_completion_proto.execution_time_in_microseconds(),
workorder_completion_proto.operator_index());
- // updateProbability();
if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
updateFeedbackFromQueriesInPriorityLevel(priority_level);
}
+ updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
+ updateProbabilitiesOfAllPriorityLevels();
}
void Learner::updateProbabilitiesForQueriesInPriorityLevel(
@@ -67,7 +68,6 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
return;
} else if (execution_stats_[priority_level].size() == 1u) {
DCHECK(current_probabilities_[priority_level] != nullptr);
- DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
// As we want the probability of the lone query in this priority level as
// 1, we set the numerator same as denominator.
const std::size_t numerator =
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cb519ebd/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index a1a144d..556c984 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -20,11 +20,26 @@
#include "gtest/gtest.h"
#include "query_execution/Learner.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_optimizer/QueryHandle.hpp"
namespace quickstep {
-TEST(LearnerTest, AddAndRemoveQueryTest) {
+class LearnerTest : public ::testing::Test {
+ protected:
+ serialization::NormalWorkOrderCompletionMessage createMockCompletionMessage(
+ const std::size_t query_id, const std::size_t operator_id) {
+ serialization::NormalWorkOrderCompletionMessage mock_proto_message;
+ mock_proto_message.set_operator_index(operator_id);
+ mock_proto_message.set_query_id(query_id);
+ mock_proto_message.set_worker_thread_index(0);
+ mock_proto_message.set_execution_time_in_microseconds(10);
+
+ return mock_proto_message;
+ }
+};
+
+TEST_F(LearnerTest, AddAndRemoveQueryTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle;
const std::size_t kPriorityLevel = 1;
@@ -53,7 +68,7 @@ TEST(LearnerTest, AddAndRemoveQueryTest) {
EXPECT_FALSE(learner.hasActiveQueries());
}
-TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle1, handle2;
const std::size_t kPriorityLevel = 1;
@@ -85,7 +100,7 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
}
-TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle1, handle2;
const std::size_t kPriorityLevel1 = 1;
@@ -123,4 +138,65 @@ TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
}
+TEST_F(LearnerTest, AddCompletionFeedbackSamePriorityLevelTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel = 1;
+ handle1.reset(new QueryHandle(1, kPriorityLevel));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ learner.addQuery(*handle1);
+ serialization::NormalWorkOrderCompletionMessage completion_message =
+ createMockCompletionMessage(handle1->query_id(), 0);
+
+ learner.addCompletionFeedback(completion_message);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+ handle2.reset(new QueryHandle(2, kPriorityLevel));
+ learner.addQuery(*handle2);
+ completion_message = createMockCompletionMessage(handle2->query_id(), 0);
+ learner.addCompletionFeedback(completion_message);
+
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+}
+
+TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel1 = 1;
+ const std::size_t kPriorityLevel2 = 2;
+ handle1.reset(new QueryHandle(1, kPriorityLevel1));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ learner.addQuery(*handle1);
+
+ handle2.reset(new QueryHandle(2, kPriorityLevel2));
+ learner.addQuery(*handle2);
+
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+ const std::size_t kNumIterations = 10;
+ std::vector<QueryHandle*> handles;
+ handles.emplace_back(handle1.get());
+ handles.emplace_back(handle2.get());
+ for (std::size_t iter_num = 0; iter_num < kNumIterations; ++iter_num) {
+ for (std::size_t index = 0; index < handles.size(); ++index) {
+ EXPECT_TRUE(learner.hasActiveQueries());
+ serialization::NormalWorkOrderCompletionMessage completion_message =
+ createMockCompletionMessage(handles[index]->query_id(), 0);
+ learner.addCompletionFeedback(completion_message);
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ }
+ }
+}
} // namespace quickstep
[03/11] incubator-quickstep git commit: Added ExecutionStats class
Posted by hb...@apache.org.
Added ExecutionStats class
- To keep track of query execution statistics for a given query.
- The stats class organizes execution time on a per-operator basis.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8e973b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8e973b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8e973b87
Branch: refs/heads/scheduler++
Commit: 8e973b873b1d3c96fc025845868b24db293f21c6
Parents: 714874c
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 19 10:06:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 6 ++
query_execution/ExecutionStats.hpp | 177 ++++++++++++++++++++++++++++++++
query_execution/PolicyEnforcer.hpp | 2 +-
3 files changed, 184 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e973b87/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..fcd4f48 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -32,6 +32,7 @@ if (ENABLE_DISTRIBUTED)
add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
endif()
add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
+add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -69,6 +70,9 @@ if (ENABLE_DISTRIBUTED)
quickstep_utility_Macros
tmb)
endif()
+target_link_libraries(quickstep_queryexecution_ExecutionStats
+ glog
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_Foreman
${GFLAGS_LIB_NAME}
glog
@@ -91,6 +95,7 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
+ quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -199,6 +204,7 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_AdmitRequestMessage
+ quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_Foreman
quickstep_queryexecution_ForemanLite
quickstep_queryexecution_PolicyEnforcer
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e973b87/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
new file mode 100644
index 0000000..f28f367
--- /dev/null
+++ b/query_execution/ExecutionStats.hpp
@@ -0,0 +1,177 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief Record the execution stats of a query.
+ **/
+class ExecutionStats {
+ public:
+ /**
+ * @brief Constructor
+ *
+ * @param max_entries The maximum number of entries we remember for each
+ * operator.
+ **/
+ explicit ExecutionStats(const std::size_t max_entries)
+ : max_entries_(max_entries), cached_stats_(std::make_pair(0, 0)) {}
+
+ /**
+ * @brief Get the number of active operators in stats.
+ **/
+ const std::size_t getNumActiveOperators() const {
+ return active_operators_.size();
+ }
+
+ /**
+ * @brief Get the current stats.
+ *
+ * @note This function updates the cache, hence it can't be const. We are lazy
+ * in updating the cache, instead of eagerly updating the cache upon
+ * each update.
+ *
+ * @return A pair - 1st element is total time, 2nd element is total number of
+ * WorkOrders for the whole query.
+ **/
+ std::pair<std::uint64_t, std::uint64_t> getCurrentStats() {
+ if (active_operators_.empty()) {
+ return cached_stats_;
+ } else {
+ std::pair<std::uint64_t, std::uint64_t> result = std::make_pair(0, 0);
+ for (auto it = active_operators_.begin(); it != active_operators_.end(); ++it) {
+ DCHECK(it->second.get() != nullptr);
+ std::pair<std::uint64_t, std::size_t> op_stats = it->second->getStats();
+ result.first += op_stats.first;
+ result.second += op_stats.second;
+ }
+ if (result.first == 0 || result.second == 0) {
+ // If one of the element in the pair is 0, use old result.
+ return cached_stats_;
+ } else if (result.first != 0 && result.second != 0) {
+ cached_stats_ = result;
+ }
+ return result;
+ }
+ }
+
+ /**
+ * @brief Add a new entry to stats.
+ *
+ * @param value The value to be added.
+ * @param operator_index The operator index which the value belongs to.
+ **/
+ void addEntry(std::size_t value, std::size_t operator_index) {
+ if (hasOperator(operator_index)) {
+ // This is not the first entry for the given operator.
+ active_operators_[operator_index]->addEntry(value);
+ } else {
+ // Create the OperatorStats object for this operator.
+ active_operators_[operator_index] =
+ std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
+ }
+ }
+
+ /**
+ * @brief Remove the operator with given index. This should be called only
+ * when the given operator finishes its execution.
+ **/
+ void removeOperator(std::size_t operator_index) {
+ DCHECK(hasOperator(operator_index));
+ active_operators_.erase(operator_index);
+ }
+
+ private:
+ /**
+ * @brief Stats for an operator within the query.
+ *
+ * @note We remember only the last N entries for the operator.
+ **/
+ class OperatorStats {
+ public:
+ /**
+ * @brief Constructor.
+ *
+ * @param max_entries The maximum number of entries we remember. Typically
+ * these are the last N (=max_entries) entries.
+ **/
+ explicit OperatorStats(const std::size_t max_entries) : max_entries_(max_entries) {
+ DCHECK_GE(max_entries, 0);
+ }
+
+ inline std::pair<std::uint64_t, std::size_t> getStats() const {
+ return std::make_pair(std::accumulate(times_.begin(), times_.end(), 0),
+ times_.size());
+ }
+
+ inline void addEntry(std::uint64_t time_value) {
+ if (times_.size() == max_entries_) {
+ times_.pop_front();
+ }
+ times_.push_back(time_value);
+ DCHECK_LE(times_.size(), max_entries_);
+ }
+
+ private:
+ const std::size_t max_entries_;
+ std::deque<std::uint64_t> times_;
+
+ DISALLOW_COPY_AND_ASSIGN(OperatorStats);
+ };
+
+ /**
+ * @brief Check if the operator with given index is present in the stats.
+ **/
+ inline bool hasOperator(const std::size_t operator_index) const {
+ return active_operators_.find(operator_index) != active_operators_.end();
+ }
+
+ const std::size_t max_entries_;
+
+ std::unordered_map<std::size_t, std::unique_ptr<OperatorStats>>
+ active_operators_;
+
+ // Cached stats for the whole query.
+ std::pair<std::uint64_t, std::uint64_t> cached_stats_;
+
+ DISALLOW_COPY_AND_ASSIGN(ExecutionStats);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e973b87/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 470ff2a..b7f5735 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -214,4 +214,4 @@ class PolicyEnforcer {
} // namespace quickstep
-#endif // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
+#endif // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_
[11/11] incubator-quickstep git commit: Added GFLAG to learner.
Posted by hb...@apache.org.
Added GFLAG to learner.
- To control the number of number of work order execution statistics
that are maintained in the Learner, for a given query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ad3281f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ad3281f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ad3281f
Branch: refs/heads/scheduler++
Commit: 9ad3281f2c358a4232413792565e573cfa358c9f
Parents: 35bf47e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 11:39:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.cpp | 25 +++++++-
query_execution/Learner.hpp | 76 ++++++++++++-------------
query_execution/tests/Learner_unittest.cpp | 17 +++---
3 files changed, 69 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ad3281f/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 38a773b..0f17e7a 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -34,6 +34,11 @@
namespace quickstep {
+DEFINE_uint64(max_past_entries_learner,
+ 10,
+ "The maximum number of past WorkOrder execution statistics"
+ " entries for a query");
+
void Learner::addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto) {
@@ -90,8 +95,7 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
}
}
-void Learner::updateProbabilitiesOfAllPriorityLevels(
- const std::size_t priority_level) {
+void Learner::updateProbabilitiesOfAllPriorityLevels() {
if (!hasFeedbackFromAllPriorityLevels() ||
has_feedback_from_all_queries_.empty()) {
// Either we don't have enough feedback messages from all the priority
@@ -114,7 +118,7 @@ void Learner::updateProbabilitiesOfAllPriorityLevels(
total_time_curr_level += mean_workorder_entry.second;
}
const std::size_t num_queries_in_priority_level =
- execution_stats_[priority_level].size();
+ execution_stats_[curr_priority_level].size();
DCHECK_GT(num_queries_in_priority_level, 0u);
predicted_time_for_level[curr_priority_level] =
total_time_curr_level / num_queries_in_priority_level;
@@ -195,4 +199,19 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
}
}
+void Learner::initializeQuery(const QueryHandle &query_handle) {
+ const std::size_t priority_level = query_handle.query_priority();
+ const std::size_t query_id = query_handle.query_id();
+ DCHECK(isPriorityLevelPresent(priority_level));
+ query_id_to_priority_lookup_[query_id] = priority_level;
+ // TODO(harshad) - Create a gflag for max_past_entries_learner.
+ execution_stats_[priority_level].emplace_back(
+ query_id,
+ std::unique_ptr<ExecutionStats>(
+ new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // As we are initializing the query, we obviously haven't gotten any
+ // feedback message for this query. Hence mark the following field as false.
+ has_feedback_from_all_queries_[priority_level] = false;
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ad3281f/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index fb0e4cb..073b693 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -30,15 +30,10 @@
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
-#include "gflags/gflags.h"
#include "glog/logging.h"
namespace quickstep {
-/*DECLARE_int32(max_past_entries_learner,
- 10,
- "The maximum number of past WorkOrder execution statistics"
- " entries for a query");*/
/** \addtogroup QueryExecution
* @{
*/
@@ -94,14 +89,6 @@ class Learner {
}
}
- void updateProbabilitiesForQueriesInPriorityLevel(
- const std::size_t priority_level, const std::size_t query_id);
-
- // TODO(harshad) - Cache internal results from previous invocation of this
- // function and reuse them. There's a lot of redundancy in computations
- // at this point.
- void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
-
inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
@@ -122,6 +109,29 @@ class Learner {
private:
/**
+ * @brief Update the probabilities for queries in the given priority level.
+ *
+ * @note This function is called after the learner receives a completion
+ * feedback message from a given query.
+ *
+ * @param priority_level The priority level.
+ * @param query_id The ID of the query for which a completion feedback message
+ * has been received.
+ *
+ **/
+ void updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id);
+
+ /**
+ * @brief Update the probabilities of all the priority levels.
+ *
+ * TODO(harshad) - Cache internal results from previous invocation of this
+ * function and reuse them. There's a lot of redundancy in computations
+ * at this point.
+ **/
+ void updateProbabilitiesOfAllPriorityLevels();
+
+ /**
* @brief Initialize the default probabilities for the queries.
**/
void initializeDefaultProbabilitiesForAllQueries();
@@ -135,18 +145,14 @@ class Learner {
* @brief Initialize the data structures for a given priority level, if none
* exist. If there are already data structures for the given priority
* level, do nothing.
+ *
+ * @note This function should be followed by a relearn() call, to insert this
+ * priority levels in probabilities_of_priority_levels_.
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
- // Calculate the default probability for the priority level here and use
- // it instead of 0.5 here.
- // TODO(harshad) - Correct this.
- /*const float new_denominator =
- probabilities_of_priority_levels_[priority_level]->getDenominator();
- probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
- priority_level, priority_level, new_denominator);*/
execution_stats_[priority_level];
}
}
@@ -169,6 +175,10 @@ class Learner {
/**
* @brief Check if the Learner has presence of the given priority level.
+ *
+ * @param priority_level The priority level.
+ *
+ * @return True if present, false otherwise.
**/
inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
DCHECK_EQ((current_probabilities_.find(priority_level) ==
@@ -178,7 +188,7 @@ class Learner {
}
/**
- * @brief Check if the query is present.
+ * @brief Check if the query is present in local data structures.
**/
inline bool isQueryPresent(const std::size_t query_id) const {
return query_id_to_priority_lookup_.find(query_id) !=
@@ -190,20 +200,7 @@ class Learner {
*
* @param query_handle The query handle for the new query.
**/
- void initializeQuery(const QueryHandle &query_handle) {
- const std::size_t priority_level = query_handle.query_priority();
- const std::size_t query_id = query_handle.query_id();
- DCHECK(isPriorityLevelPresent(priority_level));
- query_id_to_priority_lookup_[query_id] = priority_level;
- execution_stats_[priority_level].emplace_back(
- query_id,
- std::unique_ptr<ExecutionStats>(
- // new ExecutionStats(FLAGS_max_past_entries_learner)));
- new ExecutionStats(10)));
- // As we are initializing the query, we obviously haven't gotten any
- // feedback message for this query. Hence mark the following field as false.
- has_feedback_from_all_queries_[priority_level] = false;
- }
+ void initializeQuery(const QueryHandle &query_handle);
/**
* @brief Get the execution stats object for the given query.
@@ -222,9 +219,10 @@ class Learner {
}
/**
- * @brief This function works well when the query and priority level exists
- * in the data structures.
+ * @breif Get a mutable iterator to the execution stats for a given query.
*
+ * @note This function works well when the query and priority level exists
+ * in the data structures.
**/
inline std::vector<
std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
@@ -244,6 +242,9 @@ class Learner {
return stats_iter;
}
+ /**
+ * @brief Get a query's priority level given its ID.
+ **/
inline const std::size_t getQueryPriority(const std::size_t query_id) const {
const auto it = query_id_to_priority_lookup_.find(query_id);
DCHECK(it != query_id_to_priority_lookup_.end());
@@ -366,7 +367,6 @@ class Learner {
// Key = priority level. Value = A boolean that indicates if we have received
// feedback from all the queries in the given priority level.
- // TODO(harshad) - Invalidate the cache whenever needed.
std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
DISALLOW_COPY_AND_ASSIGN(Learner);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ad3281f/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 864bb22..a1a144d 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -27,28 +27,29 @@ namespace quickstep {
TEST(LearnerTest, AddAndRemoveQueryTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle;
- const std::size_t kPriorityLevel1 = 1;
- handle.reset(new QueryHandle(1, kPriorityLevel1));
+ const std::size_t kPriorityLevel = 1;
+ handle.reset(new QueryHandle(1, kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
learner.addQuery(*handle);
EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_TRUE(learner.hasActiveQueries());
+
learner.removeQuery(handle->query_id());
EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
- const std::size_t kPriorityLevel2 = 1;
- handle.reset(new QueryHandle(1, kPriorityLevel2));
+ handle.reset(new QueryHandle(2, kPriorityLevel));
learner.addQuery(*handle);
EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_TRUE(learner.hasActiveQueries());
+
learner.removeQuery(handle->query_id());
EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
- EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
}
[10/11] incubator-quickstep git commit: API to find the highest
priority level in the learner
Posted by hb...@apache.org.
API to find the highest priority level in the learner
- Unit tests to test the feature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5514e009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5514e009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5514e009
Branch: refs/heads/scheduler++
Commit: 5514e0090c1d753424601bc2f2cb165984b0766e
Parents: cb519eb
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 14:45:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 12 +++--
query_execution/Learner.cpp | 33 +++++++++++++
query_execution/Learner.hpp | 30 +++++++-----
query_execution/ProbabilityStore.hpp | 10 ++--
query_execution/QueryExecutionTypedefs.hpp | 2 +
query_execution/tests/Learner_unittest.cpp | 64 +++++++++++++++++++++++++
6 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ef1ce99..4639617 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -76,7 +76,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
glog
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_Foreman
- ${GFLAGS_LIB_NAME}
+ ${GFLAGS_LIB_NAME}
glog
quickstep_queryexecution_AdmitRequestMessage
quickstep_queryexecution_ForemanLite
@@ -99,7 +99,8 @@ target_link_libraries(quickstep_queryexecution_Learner
glog
quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_ProbabilityStore
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
@@ -274,9 +275,10 @@ target_link_libraries(Learner_unittest
gtest
gtest_main
quickstep_queryexecution_Learner
- quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryoptimizer_QueryHandle)
-add_test(Learner_unittest Learner_unittest)
+add_test(Learner_unittest Learner_unittest)
add_executable(ProbabilityStore_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
@@ -284,7 +286,7 @@ target_link_libraries(ProbabilityStore_unittest
gtest
gtest_main
quickstep_queryexecution_ProbabilityStore)
-add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
add_executable(QueryManager_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index c7a7064..720df33 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,6 +26,7 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
@@ -39,6 +40,11 @@ DEFINE_uint64(max_past_entries_learner,
"The maximum number of past WorkOrder execution statistics"
" entries for a query");
+Learner::Learner()
+ : highest_priority_level_(kInvalidPriorityLevel) {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
+}
+
void Learner::addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto) {
@@ -214,4 +220,31 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
has_feedback_from_all_queries_[priority_level] = false;
}
+void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ execution_stats_.erase(priority_level);
+ current_probabilities_.erase(priority_level);
+ probabilities_of_priority_levels_->removeObject(priority_level);
+ has_feedback_from_all_queries_.erase(priority_level);
+ if (hasActiveQueries()) {
+ if (static_cast<int>(priority_level) == highest_priority_level_) {
+ // The priority level to be removed is the highest priority level.
+ std::size_t new_highest_priority_level = 0;
+ // Find the new highest priority level.
+ for (auto priority_level_it = execution_stats_.cbegin();
+ priority_level_it != execution_stats_.cend();
+ ++priority_level_it) {
+ if (priority_level_it->first > new_highest_priority_level) {
+ new_highest_priority_level = priority_level_it->first;
+ }
+ }
+ highest_priority_level_ = static_cast<int>(new_highest_priority_level);
+ }
+ } else {
+ highest_priority_level_ = kInvalidPriorityLevel;
+ }
+ }
+}
+
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 073b693..f99b1c6 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -43,9 +43,7 @@ class Learner {
/**
* @brief Constructor.
**/
- Learner() {
- probabilities_of_priority_levels_.reset(new ProbabilityStore());
- }
+ Learner();
void addCompletionFeedback(
const serialization::NormalWorkOrderCompletionMessage
@@ -107,6 +105,16 @@ class Learner {
return query_id_to_priority_lookup_.size();
}
+ /**
+ * @brief Get the highest priority level among the active queries.
+ *
+ * @return The highest priority level. If the system is empty it returns
+ * kInvalidPriorityLevel.
+ **/
+ inline const int getHighestPriorityLevel() const {
+ return highest_priority_level_;
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -151,9 +159,13 @@ class Learner {
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
+ CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
execution_stats_[priority_level];
+ if (static_cast<int>(priority_level) > highest_priority_level_) {
+ highest_priority_level_ = priority_level;
+ }
}
}
@@ -163,15 +175,7 @@ class Learner {
*
* @param priority_level The priority level.
**/
- inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
- DCHECK(isPriorityLevelPresent(priority_level));
- if (execution_stats_[priority_level].empty()) {
- execution_stats_.erase(priority_level);
- current_probabilities_.erase(priority_level);
- probabilities_of_priority_levels_->removeObject(priority_level);
- has_feedback_from_all_queries_.erase(priority_level);
- }
- }
+ void checkAndRemovePriorityLevel(const std::size_t priority_level);
/**
* @brief Check if the Learner has presence of the given priority level.
@@ -369,6 +373,8 @@ class Learner {
// feedback from all the queries in the given priority level.
std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+ int highest_priority_level_;
+
DISALLOW_COPY_AND_ASSIGN(Learner);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 347df89..233dd2e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -212,9 +212,10 @@ class ProbabilityStore {
cumulative_probability);
cumulative_probability += p.second.second;
}
- // Adjust the last cumulative probability manually to 1.0, so that
+ DCHECK(!cumulative_probabilities_.empty());
+ // Adjust the last cumulative probability manually to 1, so that
// floating addition related rounding issues are ignored.
- cumulative_probabilities_.back().updateProbability(1.0);
+ cumulative_probabilities_.back().updateProbability(1);
}
/**
@@ -233,7 +234,9 @@ class ProbabilityStore {
public:
ProbabilityInfo(const std::size_t property, const float probability)
: property_(property), probability_(probability) {
- DCHECK_LE(probability, 1.0);
+ // As GLOG doesn't provide DEBUG only checks for less than equal
+ // comparison for floats, we can't ensure that probability is less than
+ // 1.0.
}
ProbabilityInfo(const ProbabilityInfo &other) = default;
@@ -241,7 +244,6 @@ class ProbabilityStore {
ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
void updateProbability(const float new_probability) {
- DCHECK_LE(new_probability, 1.0);
probability_ = new_probability;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..e13f3e0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -43,6 +43,8 @@ typedef tmb::TaggedMessage TaggedMessage;
typedef tmb::client_id client_id;
typedef tmb::message_type_id message_type_id;
+const int kInvalidPriorityLevel = -1;
+
using ClientIDMap = ThreadIDBasedMap<client_id,
'C',
'l',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 556c984..107576f 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -15,12 +15,18 @@
* limitations under the License.
**/
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
#include <memory>
+#include <random>
+#include <vector>
#include "gtest/gtest.h"
#include "query_execution/Learner.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
namespace quickstep {
@@ -199,4 +205,62 @@ TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
}
}
}
+
+TEST_F(LearnerTest, HighestPriorityLevelTest) {
+ std::vector<std::size_t> priorities_insertion_order;
+ std::vector<std::size_t> priorities_removal_order;
+ const std::size_t kNumPrioritiesToTest = 20;
+ for (std::size_t priority_num = 1;
+ priority_num <= kNumPrioritiesToTest;
+ ++priority_num) {
+ // Note: Priority level should be non-zero, hence we begin from 1.
+ priorities_insertion_order.emplace_back(priority_num);
+ priorities_removal_order.emplace_back(priority_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g(rd());
+
+ std::shuffle(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ g);
+
+ std::shuffle(priorities_removal_order.begin(),
+ priorities_removal_order.end(),
+ g);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+
+ std::unique_ptr<QueryHandle> handle;
+ // First insert the queries in the order of priorities as defined by
+ // priorities_insertion_order.
+ for (auto it = priorities_insertion_order.begin();
+ it != priorities_insertion_order.end();
+ ++it) {
+ // Note that the query ID is kept the same as priority level for simplicity.
+ handle.reset(new QueryHandle(*it, *it));
+ learner.addQuery(*handle);
+ const std::size_t max_priority_so_far =
+ *(std::max_element(priorities_insertion_order.begin(), it + 1));
+ EXPECT_EQ(static_cast<int>(max_priority_so_far),
+ learner.getHighestPriorityLevel());
+ }
+ // Now remove the queries in the order of priorities as defined by
+ // priorities_removal_order.
+ for (auto it = priorities_removal_order.begin();
+ it != priorities_removal_order.end();
+ ++it) {
+ // Recall that the query ID is the same as priority level.
+ const std::size_t max_priority_so_far =
+ *(std::max_element(it, priorities_removal_order.end()));
+ EXPECT_EQ(static_cast<int>(max_priority_so_far),
+ learner.getHighestPriorityLevel());
+ learner.removeQuery(*it);
+ }
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+}
+
} // namespace quickstep
[04/11] incubator-quickstep git commit: Get number of active queries
(total and by priority level)
Posted by hb...@apache.org.
Get number of active queries (total and by priority level)
- Unit tests to check the feature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6183e393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6183e393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6183e393
Branch: refs/heads/scheduler++
Commit: 6183e3933f5d90ffb23cffcefd1719e79a001727
Parents: d5c6c9d
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:42:56 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500
----------------------------------------------------------------------
query_execution/Learner.hpp | 16 +++++++-
query_execution/tests/Learner_unittest.cpp | 54 ++++++++++++++++++++-----
2 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6183e393/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 9d51877..fb0e4cb 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -102,10 +102,24 @@ class Learner {
// at this point.
void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
- inline const std::size_t hasActiveQueries() const {
+ inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
+ inline const std::size_t getNumActiveQueriesInPriorityLevel(
+ const std::size_t priority_level) const {
+ const auto it = execution_stats_.find(priority_level);
+ if (it != execution_stats_.end()) {
+ return it->second.size();
+ } else {
+ return 0;
+ }
+ }
+
+ inline const std::size_t getTotalNumActiveQueries() const {
+ return query_id_to_priority_lookup_.size();
+ }
+
private:
/**
* @brief Initialize the default probabilities for the queries.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6183e393/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index cab241a..74353f0 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -24,32 +24,64 @@
namespace quickstep {
-TEST(LearnerTest, AddQueryTest) {
+TEST(LearnerTest, AddAndRemoveQueryTest) {
Learner learner;
std::unique_ptr<QueryHandle> handle;
- handle.reset(new QueryHandle(1, 1));
+ const std::size_t kPriorityLevel1 = 1;
+ handle.reset(new QueryHandle(1, kPriorityLevel1));
EXPECT_FALSE(learner.hasActiveQueries());
learner.addQuery(*handle);
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+ EXPECT_FALSE(learner.hasActiveQueries());
+
+ const std::size_t kPriorityLevel2 = 1;
+ handle.reset(new QueryHandle(1, kPriorityLevel2));
+ learner.addQuery(*handle);
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+ EXPECT_FALSE(learner.hasActiveQueries());
}
-TEST(LearnerTest, RemoveQueryTest) {
+TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
Learner learner;
- std::unique_ptr<QueryHandle> handle;
- handle.reset(new QueryHandle(1, 1));
+ std::unique_ptr<QueryHandle> handle1, handle2;
+ const std::size_t kPriorityLevel = 1;
+ handle1.reset(new QueryHandle(1, kPriorityLevel));
+ handle2.reset(new QueryHandle(2, kPriorityLevel));
EXPECT_FALSE(learner.hasActiveQueries());
- learner.addQuery(*handle);
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+ learner.addQuery(*handle1);
EXPECT_TRUE(learner.hasActiveQueries());
- learner.removeQuery(handle->query_id());
- EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+ learner.addQuery(*handle2);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
- handle.reset(new QueryHandle(2, 1));
- learner.addQuery(*handle);
+ learner.removeQuery(handle1->query_id());
EXPECT_TRUE(learner.hasActiveQueries());
- learner.removeQuery(handle->query_id());
+ EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+ learner.removeQuery(handle2->query_id());
+
EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+ EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
}
} // namespace quickstep
[05/11] incubator-quickstep git commit: Created Learner class.
Posted by hb...@apache.org.
Created Learner class.
- Learner keeps track of statistics of concurrent queries
- It maintains the probabilities for individual queries as well as the
priority levels in the system.
- Changes in ProbabilityStore class including addition of numerator,
denominator and more unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/75ec7f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/75ec7f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/75ec7f05
Branch: refs/heads/scheduler++
Commit: 75ec7f0539285286b73f14a0d774bc76c8bfc5f5
Parents: 1b24694
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 15:54:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 11 +
query_execution/ExecutionStats.hpp | 18 +
query_execution/Learner.cpp | 195 ++++++++++
query_execution/Learner.hpp | 352 +++++++++++++++++++
query_execution/PolicyEnforcer.cpp | 2 +
query_execution/ProbabilityStore.hpp | 148 ++++++--
.../tests/ProbabilityStore_unittest.cpp | 45 ++-
7 files changed, 728 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 18ae0da..cb0f815 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,7 @@ add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitR
add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -93,11 +94,20 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
quickstep_threading_Thread
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_queryexecution_Learner
+ ${GFLAGS_LIB_NAME}
+ glog
+ quickstep_queryexecution_ExecutionStats
+ quickstep_queryexecution_ProbabilityStore
+ quickstep_queryexecution_QueryExecutionMessages_proto
+ quickstep_queryoptimizer_QueryHandle
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_PolicyEnforcer
${GFLAGS_LIB_NAME}
glog
quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_Learner
quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
@@ -212,6 +222,7 @@ target_link_libraries(quickstep_queryexecution
quickstep_queryexecution_ExecutionStats
quickstep_queryexecution_Foreman
quickstep_queryexecution_ForemanLite
+ quickstep_queryexecution_Learner
quickstep_queryexecution_PolicyEnforcer
quickstep_queryexecution_QueryContext
quickstep_queryexecution_QueryContext_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index f28f367..769c7a4 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,6 +58,20 @@ class ExecutionStats {
}
/**
+ * @brief Check if there are any stats present.
+ **/
+ inline bool hasStats() const {
+ for (auto it = active_operators_.begin();
+ it != active_operators_.end();
+ ++it) {
+ if (!it->second->hasStatsForOperator()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
* @brief Get the current stats.
*
* @note This function updates the cache, hence it can't be const. We are lazy
@@ -145,6 +159,10 @@ class ExecutionStats {
DCHECK_LE(times_.size(), max_entries_);
}
+ inline bool hasStatsForOperator() const {
+ return !times_.empty();
+ }
+
private:
const std::size_t max_entries_;
std::deque<std::uint64_t> times_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
new file mode 100644
index 0000000..72c68f0
--- /dev/null
+++ b/query_execution/Learner.cpp
@@ -0,0 +1,195 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include "query_execution/Learner.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void Learner::addCompletionFeedback(
+ const serialization::NormalWorkOrderCompletionMessage
+ &workorder_completion_proto) {
+ const std::size_t query_id = workorder_completion_proto.query_id();
+ DCHECK(isQueryPresent(query_id));
+ const std::size_t priority_level = getQueryPriority(query_id);
+ ExecutionStats *execution_stats = getExecutionStats(query_id);
+ DCHECK(execution_stats != nullptr);
+ execution_stats->addEntry(
+ workorder_completion_proto.execution_time_in_microseconds(),
+ workorder_completion_proto.operator_index());
+
+ // updateProbability();
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ updateFeedbackFromQueriesInPriorityLevel(priority_level);
+ }
+}
+
+void Learner::updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ LOG(INFO) << "Updating probabilities for query ID: " << query_id
+ << " and priority level: " << priority_level
+ << " that has no queries";
+ return;
+ } else if (execution_stats_[priority_level].size() == 1u) {
+ DCHECK(current_probabilities_[priority_level] != nullptr);
+ DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
+ // As we want the probability of the lone query in this priority level as
+ // 1, we set the numerator same as denominator.
+ const std::size_t numerator =
+ current_probabilities_[priority_level]->getDenominator();
+ current_probabilities_[priority_level]->addOrUpdateObject(query_id,
+ numerator);
+ return;
+ }
+ // Else, there are more than one queries for the given priority level.
+ std::unordered_map<std::size_t, std::size_t>
+ mean_workorders_per_query =
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(priority_level);
+ const float denominator = calculateDenominator(mean_workorders_per_query);
+ if (denominator != 0) {
+ // Update the numerator for the given query and denominator for all the
+ // queries.
+ DCHECK(mean_workorders_per_query.find(query_id) !=
+ mean_workorders_per_query.end());
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id, mean_workorders_per_query[query_id], denominator);
+ } else {
+ // At least one of the queries has predicted time for next work order as 0.
+ // In such a case, we don't update the probabilities and continue to use
+ // the older probabilities.
+ }
+}
+
+void Learner::updateProbabilitiesOfAllPriorityLevels(
+ const std::size_t priority_level) {
+ if (!hasFeedbackFromAllPriorityLevels() ||
+ has_feedback_from_all_queries_.empty()) {
+ // Either we don't have enough feedback messages from all the priority
+ // levels OR there are no active queries in the system.
+ return;
+ }
+ // Compute the predicted work order execution times for all the level.
+ std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
+ std::size_t sum_active_priorities = 0;
+ for (auto priority_iter : has_feedback_from_all_queries_) {
+ std::size_t total_time_curr_level = 0;
+ const std::size_t curr_priority_level = priority_iter.first;
+ sum_active_priorities += curr_priority_level;
+ // For each query, find its predicted work order execution time.
+ const std::unordered_map<std::size_t, std::size_t>
+ mean_workorders_all_queries_curr_level =
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(
+ curr_priority_level);
+ for (auto mean_workorder_entry : mean_workorders_all_queries_curr_level) {
+ total_time_curr_level += mean_workorder_entry.second;
+ }
+ const std::size_t num_queries_in_priority_level =
+ execution_stats_[priority_level].size();
+ DCHECK_GT(num_queries_in_priority_level, 0u);
+ predicted_time_for_level[curr_priority_level] =
+ total_time_curr_level / num_queries_in_priority_level;
+ }
+ DCHECK_GT(sum_active_priorities, 0u);
+ // Now compute the allowable number of work orders for each priority level
+ // that can be executed given a unit total time.
+ // Key = priority level, value = the # of WO mentioned above.
+ std::unordered_map<std::size_t, float> num_workorders_for_level;
+ float total_num_workorders = 0;
+ for (auto predicted_time_iter : predicted_time_for_level) {
+ const std::size_t curr_priority_level = predicted_time_iter.first;
+ const std::size_t num_workorders_for_curr_level =
+ (predicted_time_iter.second == 0)
+ ? 0
+ : static_cast<float>(curr_priority_level) /
+ sum_active_priorities /
+ static_cast<float>(predicted_time_iter.second);
+ num_workorders_for_level[curr_priority_level] = num_workorders_for_curr_level;
+ total_num_workorders += num_workorders_for_curr_level;
+ }
+ if (total_num_workorders == 0) {
+ // No priority level can be selected at this point.
+ return;
+ }
+ // Finally, compute the probabilities.
+ std::vector<std::size_t> priority_levels;
+ std::vector<float> numerators;
+ for (auto num_workorders_iter : num_workorders_for_level) {
+ priority_levels.emplace_back(num_workorders_iter.first);
+ numerators.emplace_back(num_workorders_iter.second);
+ }
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, total_num_workorders);
+}
+
+void Learner::initializeDefaultProbabilitiesForAllQueries() {
+ for (auto queries_same_priority_level_iter = execution_stats_.begin();
+ queries_same_priority_level_iter != execution_stats_.end();
+ ++queries_same_priority_level_iter) {
+ std::vector<std::size_t> query_ids;
+ const auto &queries_vector = queries_same_priority_level_iter->second;
+ DCHECK(!queries_vector.empty());
+ for (auto query_iter = queries_vector.cbegin();
+ query_iter != queries_vector.cend();
+ ++query_iter) {
+ query_ids.emplace_back(query_iter->first);
+ }
+ // Numerator for each query is 1.0
+ // The common denominator is number of queries with the given priority level.
+ std::vector<float> numerators(query_ids.size(), 1.0);
+ // Reset the probability store for this level.
+ const std::size_t curr_priority_level =
+ queries_same_priority_level_iter->first;
+ default_probabilities_[curr_priority_level].reset(new ProbabilityStore());
+ default_probabilities_[curr_priority_level]
+ ->addOrUpdateObjectsNewDenominator(
+ query_ids, numerators, query_ids.size());
+ }
+}
+
+void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
+ std::vector<std::size_t> priority_levels;
+ std::vector<float> numerators;
+ float sum_priority_levels = 0;
+ for (auto priority_iter = execution_stats_.cbegin();
+ priority_iter != execution_stats_.cend();
+ ++priority_iter) {
+ sum_priority_levels += priority_iter->second.size();
+ priority_levels.emplace_back(priority_iter->first);
+ numerators.emplace_back(priority_iter->first);
+ }
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, sum_priority_levels);
+}
+
+} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
new file mode 100644
index 0000000..64120a7
--- /dev/null
+++ b/query_execution/Learner.hpp
@@ -0,0 +1,352 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_int32(max_past_entries_learner,
+ 10,
+ "The maximum number of past WorkOrder execution statistics"
+ " entries for a query");
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+class Learner {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ Learner() {
+ }
+
+ void addCompletionFeedback(
+ const serialization::NormalWorkOrderCompletionMessage
+ &workorder_completion_proto);
+
+ void addQuery(const QueryHandle &query_handle) {
+ initializePriorityLevelIfNotPresent(query_handle.query_priority());
+ initializeQuery(query_handle);
+ relearn();
+ }
+
+ void removeQuery(const std::size_t query_id) {
+ // Find the iterator to the query in execution_stats_.
+ DCHECK(isQueryPresent(query_id));
+ const std::size_t priority_level = getQueryPriority(query_id);
+ auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
+ execution_stats_[priority_level].erase(stats_iter_mutable);
+ current_probabilities_[priority_level]->removeObject(query_id);
+ checkAndRemovePriorityLevel(priority_level);
+ relearn();
+ }
+
+ void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+ ExecutionStats *stats = getExecutionStats(query_id);
+ DCHECK(stats != nullptr);
+ stats->removeOperator(operator_id);
+ }
+
+ void relearn() {
+ if (hasActiveQueries()) {
+ initializeDefaultProbabilitiesForAllQueries();
+ initializeDefaultProbabilitiesForPriorityLevels();
+ }
+ }
+
+ void updateProbabilitiesForQueriesInPriorityLevel(
+ const std::size_t priority_level, const std::size_t query_id);
+
+ // TODO(harshad) - Cache internal results from previous invocation of this
+ // function and reuse them. There's a lot of redundancy in computations
+ // at this point.
+ void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+
+ private:
+ /**
+ * @brief Initialize the default probabilities for the queries.
+ **/
+ void initializeDefaultProbabilitiesForAllQueries();
+
+ /**
+ * @brief Initialize the default probabilities for the priority levels.
+ **/
+ void initializeDefaultProbabilitiesForPriorityLevels();
+
+ /**
+ * @brief Initialize the data structures for a given priority level, if none
+ * exist. If there are already data structures for the given priority
+ * level, do nothing.
+ **/
+ inline void initializePriorityLevelIfNotPresent(
+ const std::size_t priority_level) {
+ if (isPriorityLevelPresent(priority_level)) {
+ current_probabilities_[priority_level].reset(new ProbabilityStore());
+ // Calculate the default probability for the priority level here and use
+ // it instead of 0.5 here.
+ // TODO(harshad) - Correct this.
+ probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+ execution_stats_[priority_level];
+ }
+ }
+
+ /**
+ * @brief First check if the priority level needs to be removed from the local
+ * data structures and remove if needed.
+ *
+ * @param priority_level The priority level.
+ **/
+ inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (execution_stats_[priority_level].empty()) {
+ execution_stats_.erase(priority_level);
+ current_probabilities_.erase(priority_level);
+ probabilities_of_priority_levels_->removeObject(priority_level);
+ has_feedback_from_all_queries_.erase(priority_level);
+ }
+ }
+
+ /**
+ * @brief Check if the Learner has presence of the given priority level.
+ **/
+ inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
+ DCHECK_EQ((current_probabilities_.find(priority_level) ==
+ current_probabilities_.end()),
+ execution_stats_.find(priority_level) == execution_stats_.end());
+ return (execution_stats_.find(priority_level) != execution_stats_.end());
+ }
+
+ /**
+ * @brief Check if the query is present.
+ **/
+ inline bool isQueryPresent(const std::size_t query_id) const {
+ return query_id_to_priority_lookup_.find(query_id) !=
+ query_id_to_priority_lookup_.end();
+ }
+
+ /**
+ * @brief Initialize all the data structures for a new query.
+ *
+ * @param query_handle The query handle for the new query.
+ **/
+ void initializeQuery(const QueryHandle &query_handle) {
+ const std::size_t priority_level = query_handle.query_priority();
+ const std::size_t query_id = query_handle.query_id();
+ DCHECK(isPriorityLevelPresent(priority_level));
+ query_id_to_priority_lookup_[query_id] = priority_level;
+ execution_stats_[priority_level].emplace_back(
+ query_id,
+ std::unique_ptr<ExecutionStats>(
+ new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // As we are initializing the query, we obviously haven't gotten any
+ // feedback message for this query. Hence mark the following field as false.
+ has_feedback_from_all_queries_[priority_level] = false;
+ }
+
+ /**
+ * @brief Get the execution stats object for the given query.
+ *
+ * @return A pointer to the ExecutionStats for the query. If not present,
+ * returns NULL.
+ **/
+ inline ExecutionStats* getExecutionStats(const std::size_t query_id) {
+ if (isQueryPresent(query_id)) {
+ const auto stats_iter = getExecutionStatsIterMutable(query_id);
+ DCHECK(stats_iter !=
+ std::end(execution_stats_[getQueryPriority(query_id)]));
+ return stats_iter->second.get();
+ }
+ return nullptr;
+ }
+
+ /**
+ * @brief This function works well when the query and priority level exists
+ * in the data structures.
+ *
+ **/
+ inline std::vector<
+ std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
+ getExecutionStatsIterMutable(const std::size_t query_id) {
+ const std::size_t priority_level = getQueryPriority(query_id);
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_[priority_level];
+ // The following line uses std::find_if to reach to the desired element
+ // in the stats_vector.
+ auto stats_iter = std::find_if(
+ stats_vector.begin(),
+ stats_vector.end(),
+ [&query_id](
+ const std::pair<std::size_t, std::unique_ptr<ExecutionStats>> &p) {
+ return p.first == query_id;
+ });
+ return stats_iter;
+ }
+
+ inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+ const auto it = query_id_to_priority_lookup_.find(query_id);
+ DCHECK(it != query_id_to_priority_lookup_.end());
+ return it->second;
+ }
+
+ /**
+ * @brief Check if we have received at least one feedback message from all the
+ * queries in the given priority level.
+ **/
+ inline bool hasFeedbackFromAllQueriesInPriorityLevel(
+ const std::size_t priority_level) const {
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_.at(priority_level);
+ for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+ DCHECK(stats_vector[i].second != nullptr);
+ if (!stats_vector[i].second->hasStats()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ inline void updateFeedbackFromQueriesInPriorityLevel(
+ const std::size_t priority_level) {
+ const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+ &stats_vector = execution_stats_.at(priority_level);
+ for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+ DCHECK(stats_vector[i].second != nullptr);
+ if (!stats_vector[i].second->hasStats()) {
+ // At least one query has no statistics so far.
+ return;
+ }
+ }
+ // All the queries have at least one execution statistic.
+ has_feedback_from_all_queries_[priority_level] = true;
+ }
+
+ inline const std::size_t hasActiveQueries() const {
+ return !query_id_to_priority_lookup_.empty();
+ }
+
+ /**
+ * @brief Get the mean work order execution times for all the queries in
+ * a given priority level.
+ *
+ * @param priority_level The priority level.
+ *
+ * @return An unordered_map in which: Key = query ID.
+ * Value = Mean time per work order for that query.
+ **/
+ inline std::unordered_map<std::size_t, std::size_t>
+ getMeanWorkOrderTimesForQueriesInPriorityLevel(
+ const std::size_t priority_level) {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ std::unordered_map<std::size_t, std::size_t> result;
+ for (auto it = execution_stats_[priority_level].begin();
+ it != execution_stats_[priority_level].end();
+ ++it) {
+ DCHECK(it->second.get() != nullptr);
+ auto query_stats = it->second->getCurrentStats();
+ result[it->first] =
+ query_stats.second == 0 ? 0 : query_stats.first / query_stats.second;
+ }
+ return result;
+ }
+
+ /**
+ * @param mean_workorder_per_query A vector of pairs in which:
+ * 1st element is mean time per work order
+ * 2nd element is the query ID.
+ *
+ * @note If any query has mean work order time as 0, we return 0 as the
+ * denominator.
+ *
+ * @return The denominator to be used for probability calculations.
+ **/
+ inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
+ &mean_workorder_per_query) const {
+ float denominator = 0;
+ for (const auto &element : mean_workorder_per_query) {
+ if (element.second != 0) {
+ denominator += 1/static_cast<float>(element.second);
+ } else {
+ return 0;
+ }
+ }
+ return denominator;
+ }
+
+ inline bool hasFeedbackFromAllPriorityLevels() const {
+ for (auto feedback : has_feedback_from_all_queries_) {
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Key = Priority level, value = A vector of pairs.
+ // Each pair:
+ // 1st element: Query ID.
+ // 2nd Element: Execution statistics for the query.
+ std::unordered_map<
+ std::size_t,
+ std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>>
+ execution_stats_;
+
+ // Key = query ID, value = priority level for the query ID.
+ std::unordered_map<std::size_t, std::size_t> query_id_to_priority_lookup_;
+
+ // Key = priority level, value = ProbabilityStore for the queries belonging to
+ // that priority level.
+ std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+ current_probabilities_;
+
+ // Key = priority level, value = ProbabilityStore for the queries belonging to
+ // that priority level.
+ std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+ default_probabilities_;
+
+ // ProbabilityStrore for probabilities mapped to the priority levels.
+ std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+
+ // Key = priority level. Value = A boolean that indicates if we have received
+ // feedback from all the queries in the given priority level.
+ // TODO(harshad) - Invalidate the cache whenever needed.
+ std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+
+ DISALLOW_COPY_AND_ASSIGN(Learner);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index db7206b..ff734ca 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
@@ -42,6 +43,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
" the workers.");
bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+ Learner learner;
if (admitted_queries_.size() < kMaxConcurrentQueries) {
// Ok to admit the query.
const std::size_t query_id = query_handle->query_id();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 8343d24..d31caa6 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -22,6 +22,7 @@
#include <cstddef>
#include <random>
#include <unordered_map>
+#include <utility>
#include <vector>
#include "utility/Macros.hpp"
@@ -40,7 +41,7 @@ class ProbabilityStore {
* @brief Constructor.
**/
ProbabilityStore()
- : mt_(std::random_device()()) {}
+ : common_denominator_(1.0), mt_(std::random_device()()) {}
/**
* @brief Get the number of objects in the store.
@@ -50,6 +51,10 @@ class ProbabilityStore {
return individual_probabilities_.size();
}
+ inline const std::size_t getDenominator() const {
+ return common_denominator_;
+ }
+
/**
* @brief Add individual (not cumulative) probability for a given object.
*
@@ -59,16 +64,48 @@ class ProbabilityStore {
* @note This function may override previously written probability values.
*
* @param property The property of the given object.
- * @param individual_probability The individual (not cumulative) probability
- * of the given object.
+ * @param numerator The numerator for the given object.
**/
- void addProbability(const std::size_t property,
- const float individual_probability) {
- individual_probabilities_[property] = individual_probability;
+ void addOrUpdateObject(const std::size_t property,
+ const float numerator) {
+ DCHECK_LE(numerator, common_denominator_);
+ // We should have the correct individual probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we rely on the probabilities for all the objects in
+ // updateCumulativeProbabilities().
+ individual_probabilities_[property] =
+ std::make_pair(numerator, numerator / common_denominator_);
updateCumulativeProbabilities();
}
/**
+ * @brief Add individual (not cumulative) probability for a given object with
+ * updated denominator.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param property The property of the given object.
+ * @param numerator The numerator for the given object.
+ * @param new_denominator The updated denominator for the store.
+ **/
+ void addOrUpdateObjectNewDenominator(const std::size_t property,
+ const float numerator,
+ const float new_denominator) {
+ CHECK_GT(new_denominator, 0u);
+ DCHECK_LE(numerator, new_denominator);
+ common_denominator_ = new_denominator;
+ // It is alright to not have the correct probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we compute the probabilities for all the objects in
+ // updateProbabilitiesNewDenominator().
+ individual_probabilities_[property] = std::make_pair(numerator, 0.0);
+ updateProbabilitiesNewDenominator();
+ }
+
+ /**
* @brief Add individual (not cumulative) probabilities for given objects.
*
* @note This function leaves the cumulative probabilities in a consistent
@@ -77,30 +114,40 @@ class ProbabilityStore {
* @note This function may override previously written probability values.
*
* @param properties A vector of properties to be added.
- * @param individual_probabilities The individual (not cumulative)
- * probabilities of the given objects.
+ * @param numerators The numerators of the given objects.
**/
- void addProbabilities(const std::vector<std::size_t> &properties,
- const std::vector<float> &individual_probabilities) {
- DCHECK_EQ(properties.size(), individual_probabilities.size());
+ void addOrUpdateObjects(const std::vector<std::size_t> &properties,
+ const std::vector<float> &numerators) {
+ DCHECK_EQ(properties.size(), numerators.size());
for (std::size_t i = 0; i < properties.size(); ++i) {
- individual_probabilities_[properties[i]] = individual_probabilities[i];
+ DCHECK_LE(numerators[i], common_denominator_);
+ // We should have the correct individual probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we rely on the probabilities for all the objects in
+ // updateCumulativeProbabilities().
+ individual_probabilities_[properties[i]] =
+ std::make_pair(numerators[i], numerators[i] / common_denominator_);
}
updateCumulativeProbabilities();
}
- /**
- * @brief Update the probability of a given object to a new value.
- *
- * @param property The property of the object.
- * @param new_individual_probability The new probability to be set.
- **/
- void updateProbability(const std::size_t property,
- const float new_individual_probability) {
- auto it = individual_probabilities_.find(property);
- DCHECK(it != individual_probabilities_.end());
- it->second = new_individual_probability;
- updateCumulativeProbabilities();
+ void addOrUpdateObjectsNewDenominator(
+ const std::vector<std::size_t> &properties,
+ const std::vector<float> &numerators,
+ const float new_denominator) {
+ CHECK_GT(new_denominator, 0u);
+ DCHECK_EQ(properties.size(), numerators.size());
+ common_denominator_ = new_denominator;
+ for (std::size_t i = 0; i < properties.size(); ++i) {
+ DCHECK_LE(numerators[i], common_denominator_);
+ // It is alright to not have the correct probability in
+ // individual_probabilities_ for the newly added object at this point.
+ // Because we compute the probabilities for all the objects in
+ // updateProbabilitiesNewDenominator().
+ individual_probabilities_[properties[i]] =
+ std::make_pair(numerators[i], 0.0);
+ }
+ updateProbabilitiesNewDenominator();
}
/**
@@ -109,10 +156,24 @@ class ProbabilityStore {
* @param property The property of the object to be removed.
**/
void removeObject(const std::size_t property) {
- auto it = individual_probabilities_.find(property);
- DCHECK(it != individual_probabilities_.end());
- individual_probabilities_.erase(it);
- updateCumulativeProbabilities();
+ auto individual_it = individual_probabilities_.find(property);
+ DCHECK(individual_it != individual_probabilities_.end());
+ individual_probabilities_.erase(individual_it);
+ if (!individual_probabilities_.empty()) {
+ float new_denominator = 0;
+ for (auto it = individual_probabilities_.begin();
+ it != individual_probabilities_.end();
+ ++it) {
+ new_denominator += it->second.first;
+ }
+ CHECK_GT(new_denominator, 0);
+ common_denominator_ = new_denominator;
+ updateCumulativeProbabilities();
+ } else {
+ // In order to keep the store consistent, we should keep the sizes of
+ // individual_probabilities_ and cumulative_probabilities_ the same.
+ cumulative_probabilities_.clear();
+ }
}
/**
@@ -123,7 +184,7 @@ class ProbabilityStore {
const float getIndividualProbability(const std::size_t property) const {
const auto it = individual_probabilities_.find(property);
DCHECK(it != individual_probabilities_.end());
- return it->second;
+ return it->second.second;
}
/**
@@ -141,13 +202,13 @@ class ProbabilityStore {
return;
}
float cumulative_probability = 0;
- for (const auto property_probability_pair : individual_probabilities_) {
- cumulative_probabilities_.emplace_back(property_probability_pair.first,
+ for (const auto p : individual_probabilities_) {
+ cumulative_probabilities_.emplace_back(p.first,
cumulative_probability);
- cumulative_probability += property_probability_pair.second;
+ cumulative_probability += p.second.second;
}
- // Adjust the last cumulative probability manually to 1.0, so that floating
- // addition related rounding issues are ignored.
+ // Adjust the last cumulative probability manually to 1.0, so that
+ // floating addition related rounding issues are ignored.
cumulative_probabilities_.back().updateProbability(1.0);
}
@@ -208,9 +269,26 @@ class ProbabilityStore {
return it->property_;
}
- std::unordered_map<std::size_t, float> individual_probabilities_;
+ inline void updateProbabilitiesNewDenominator() {
+ // First update the individual probabilities.
+ for (auto it = individual_probabilities_.begin();
+ it != individual_probabilities_.end();
+ ++it) {
+ DCHECK_LE(it->second.first, common_denominator_);
+ it->second.second = it->second.first / common_denominator_;
+ }
+ updateCumulativeProbabilities();
+ }
+
+ // Key = property of the object.
+ // Value = A pair ...
+ // 1st element: Numerator of the object.
+ // 2nd element: Individual probability of the object.
+ std::unordered_map<std::size_t, std::pair<float, float>> individual_probabilities_;
std::vector<ProbabilityInfo> cumulative_probabilities_;
+ float common_denominator_;
+
std::mt19937_64 mt_;
DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index e624557..dcec1e5 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -28,14 +28,15 @@ TEST(ProbabilityStoreTest, CountTest) {
ProbabilityStore store;
EXPECT_EQ(0u, store.getNumObjects());
const std::size_t kProperty = 0;
- store.addProbability(kProperty, 0.5);
+ store.addOrUpdateObject(kProperty, 1);
EXPECT_EQ(1u, store.getNumObjects());
store.removeObject(kProperty);
EXPECT_EQ(0u, store.getNumObjects());
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
EXPECT_EQ(objects.size(), store.getNumObjects());
}
@@ -43,11 +44,12 @@ TEST(ProbabilityStoreTest, CountTest) {
TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
- EXPECT_EQ(probabilities[object_num],
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
store.getIndividualProbability(objects[object_num]));
}
}
@@ -55,8 +57,9 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
ProbabilityStore store;
std::vector<std::size_t> objects {3, 5, 7, 9};
- std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
- store.addProbabilities(objects, probabilities);
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
const std::size_t kNumTrials = 10;
while (!objects.empty()) {
@@ -72,4 +75,30 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
}
}
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> numerators {1, 2, 3, 5};
+ const std::size_t kNewDenominator = 10;
+ store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ store.getIndividualProbability(objects[object_num]));
+ }
+
+ // Remove last object "9", with numerator 5.
+ store.removeObject(objects.back());
+ objects.pop_back();
+ numerators.pop_back();
+ const float expected_new_denominator =
+ std::accumulate(numerators.begin(), numerators.end(), 0);
+
+ EXPECT_EQ(expected_new_denominator, store.getDenominator());
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+ store.getIndividualProbability(objects[object_num]));
+ }
+}
+
} // namespace quickstep
[06/11] incubator-quickstep git commit: Created a class for storing
probabilities.
Posted by hb...@apache.org.
Created a class for storing probabilities.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1b24694b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1b24694b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1b24694b
Branch: refs/heads/scheduler++
Commit: 1b24694bf99665d7e151d8ded751a258de000276
Parents: 8e973b8
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 21 11:45:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 13 ++
query_execution/PolicyEnforcer.cpp | 1 +
query_execution/ProbabilityStore.hpp | 223 +++++++++++++++++++
.../tests/ProbabilityStore_unittest.cpp | 75 +++++++
4 files changed, 312 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index fcd4f48..18ae0da 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -36,6 +36,7 @@ add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionSt
add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
add_library(quickstep_queryexecution_QueryContext_proto
${queryexecution_QueryContext_proto_srcs}
@@ -97,6 +98,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
glog
quickstep_queryexecution_ExecutionStats
quickstep_catalog_CatalogTypedefs
+ quickstep_queryexecution_ProbabilityStore
quickstep_queryexecution_QueryExecutionMessages_proto
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_queryexecution_QueryManager
@@ -106,6 +108,9 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
quickstep_relationaloperators_WorkOrder
quickstep_utility_Macros
tmb)
+target_link_libraries(quickstep_queryexecution_ProbabilityStore
+ glog
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_QueryContext
glog
quickstep_catalog_CatalogDatabaseLite
@@ -252,6 +257,14 @@ if (ENABLE_DISTRIBUTED)
add_test(BlockLocator_unittest BlockLocator_unittest)
endif()
+add_executable(ProbabilityStore_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
+target_link_libraries(ProbabilityStore_unittest
+ gtest
+ gtest_main
+ quickstep_queryexecution_ProbabilityStore)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
+
add_executable(QueryManager_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
target_link_libraries(QueryManager_unittest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 84aa86a..db7206b 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryManager.hpp"
#include "query_execution/WorkerDirectory.hpp"
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
new file mode 100644
index 0000000..8343d24
--- /dev/null
+++ b/query_execution/ProbabilityStore.hpp
@@ -0,0 +1,223 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <random>
+#include <unordered_map>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief A class that stores the probabilities of objects. We use a field
+ * called "property" to identify each object.
+ **/
+class ProbabilityStore {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ ProbabilityStore()
+ : mt_(std::random_device()()) {}
+
+ /**
+ * @brief Get the number of objects in the store.
+ **/
+ const std::size_t getNumObjects() const {
+ DCHECK_EQ(individual_probabilities_.size(), cumulative_probabilities_.size());
+ return individual_probabilities_.size();
+ }
+
+ /**
+ * @brief Add individual (not cumulative) probability for a given object.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param property The property of the given object.
+ * @param individual_probability The individual (not cumulative) probability
+ * of the given object.
+ **/
+ void addProbability(const std::size_t property,
+ const float individual_probability) {
+ individual_probabilities_[property] = individual_probability;
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Add individual (not cumulative) probabilities for given objects.
+ *
+ * @note This function leaves the cumulative probabilities in a consistent
+ * state. An alternative lazy implementation should be written if cost
+ * of calculating cumulative probabilities is high.
+ * @note This function may override previously written probability values.
+ *
+ * @param properties A vector of properties to be added.
+ * @param individual_probabilities The individual (not cumulative)
+ * probabilities of the given objects.
+ **/
+ void addProbabilities(const std::vector<std::size_t> &properties,
+ const std::vector<float> &individual_probabilities) {
+ DCHECK_EQ(properties.size(), individual_probabilities.size());
+ for (std::size_t i = 0; i < properties.size(); ++i) {
+ individual_probabilities_[properties[i]] = individual_probabilities[i];
+ }
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Update the probability of a given object to a new value.
+ *
+ * @param property The property of the object.
+ * @param new_individual_probability The new probability to be set.
+ **/
+ void updateProbability(const std::size_t property,
+ const float new_individual_probability) {
+ auto it = individual_probabilities_.find(property);
+ DCHECK(it != individual_probabilities_.end());
+ it->second = new_individual_probability;
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Remove an object from the store.
+ *
+ * @param property The property of the object to be removed.
+ **/
+ void removeObject(const std::size_t property) {
+ auto it = individual_probabilities_.find(property);
+ DCHECK(it != individual_probabilities_.end());
+ individual_probabilities_.erase(it);
+ updateCumulativeProbabilities();
+ }
+
+ /**
+ * @brief Get the individual probability (not cumulative) for an object.
+ *
+ * @param property The property of the object.
+ **/
+ const float getIndividualProbability(const std::size_t property) const {
+ const auto it = individual_probabilities_.find(property);
+ DCHECK(it != individual_probabilities_.end());
+ return it->second;
+ }
+
+ /**
+ * @brief Update the cumulative probabilities.
+ *
+ * @note This function should be called upon if there are any updates,
+ * additions or deletions to the individual probabilities.
+ * @note An efficient implementation should be written if there are large
+ * number of objects.
+ **/
+ void updateCumulativeProbabilities() {
+ cumulative_probabilities_.clear();
+ if (individual_probabilities_.empty()) {
+ // No need to modify the cumulative probabilities.
+ return;
+ }
+ float cumulative_probability = 0;
+ for (const auto property_probability_pair : individual_probabilities_) {
+ cumulative_probabilities_.emplace_back(property_probability_pair.first,
+ cumulative_probability);
+ cumulative_probability += property_probability_pair.second;
+ }
+ // Adjust the last cumulative probability manually to 1.0, so that floating
+ // addition related rounding issues are ignored.
+ cumulative_probabilities_.back().updateProbability(1.0);
+ }
+
+ /**
+ * @brief Return a randomly chosen property.
+ *
+ * @note The random number is uniformly chosen.
+ **/
+ inline const std::size_t pickRandomProperty() {
+ std::uniform_real_distribution<float> dist(0.0, 1.0);
+ const float chosen_probability = dist(mt_);
+ return getPropertyForProbability(chosen_probability);
+ }
+
+ private:
+ class ProbabilityInfo {
+ public:
+ ProbabilityInfo(const std::size_t property, const float probability)
+ : property_(property), probability_(probability) {
+ DCHECK_LE(probability, 1.0);
+ }
+
+ ProbabilityInfo(const ProbabilityInfo &other) = default;
+
+ ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
+
+ void updateProbability(const float new_probability) {
+ DCHECK_LE(new_probability, 1.0);
+ probability_ = new_probability;
+ }
+
+ std::size_t property_;
+ float probability_;
+ };
+
+ /**
+ * @brief Get a property for a given cumulative probability.
+ *
+ * @param key_cumulative_probability The input cumulative probability.
+ *
+ * @return The object that has a cumulative probability that is greater than
+ * or equal to the input cumulative probability.
+ **/
+ inline const std::size_t getPropertyForProbability(
+ const float key_cumulative_probability) {
+ DCHECK(!cumulative_probabilities_.empty());
+ // It doesn't matter in which order the objects are arranged in the
+ // cumulative_probabilities_ vector.
+ ProbabilityInfo search_key(0, key_cumulative_probability);
+ const auto it = std::upper_bound(
+ cumulative_probabilities_.begin(),
+ cumulative_probabilities_.end(),
+ search_key,
+ [](const ProbabilityInfo &a, const ProbabilityInfo &b) {
+ return a.probability_ < b.probability_;
+ });
+ DCHECK(it != std::end(cumulative_probabilities_));
+ return it->property_;
+ }
+
+ std::unordered_map<std::size_t, float> individual_probabilities_;
+ std::vector<ProbabilityInfo> cumulative_probabilities_;
+
+ std::mt19937_64 mt_;
+
+ DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
new file mode 100644
index 0000000..e624557
--- /dev/null
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <cstddef>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/ProbabilityStore.hpp"
+
+namespace quickstep {
+
+TEST(ProbabilityStoreTest, CountTest) {
+ ProbabilityStore store;
+ EXPECT_EQ(0u, store.getNumObjects());
+ const std::size_t kProperty = 0;
+ store.addProbability(kProperty, 0.5);
+ EXPECT_EQ(1u, store.getNumObjects());
+ store.removeObject(kProperty);
+ EXPECT_EQ(0u, store.getNumObjects());
+
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+ store.addProbabilities(objects, probabilities);
+
+ EXPECT_EQ(objects.size(), store.getNumObjects());
+}
+
+TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+ store.addProbabilities(objects, probabilities);
+
+ for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+ EXPECT_EQ(probabilities[object_num],
+ store.getIndividualProbability(objects[object_num]));
+ }
+}
+
+TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
+ ProbabilityStore store;
+ std::vector<std::size_t> objects {3, 5, 7, 9};
+ std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+ store.addProbabilities(objects, probabilities);
+
+ const std::size_t kNumTrials = 10;
+ while (!objects.empty()) {
+ for (std::size_t trial_num = 0; trial_num < kNumTrials; ++trial_num) {
+ const std::size_t picked_property = store.pickRandomProperty();
+ const auto it = std::find(objects.begin(), objects.end(), picked_property);
+ EXPECT_TRUE(it != objects.end());
+ }
+ const std::size_t property_to_be_removed = objects.back();
+ store.removeObject(property_to_be_removed);
+ objects.pop_back();
+ EXPECT_EQ(objects.size(), store.getNumObjects());
+ }
+}
+
+} // namespace quickstep
[02/11] incubator-quickstep git commit: Created unit test for Learner
Posted by hb...@apache.org.
Created unit test for Learner
- API changes for probability store.
- Check if there's a probability entry for the query to be removed.
- Bug fix in remove query.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d5c6c9dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d5c6c9dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d5c6c9dd
Branch: refs/heads/scheduler++
Commit: d5c6c9ddbf3b2eece2952b2fe2c98960893e544a
Parents: 75ec7f0
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 23:03:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500
----------------------------------------------------------------------
query_execution/CMakeLists.txt | 9 ++++
query_execution/Learner.cpp | 6 ++-
query_execution/Learner.hpp | 33 ++++++++++-----
query_execution/ProbabilityStore.hpp | 5 +++
query_execution/tests/Learner_unittest.cpp | 55 +++++++++++++++++++++++++
5 files changed, 96 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index cb0f815..3904185 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -268,6 +268,15 @@ if (ENABLE_DISTRIBUTED)
add_test(BlockLocator_unittest BlockLocator_unittest)
endif()
+add_executable(Learner_unittest
+ "${CMAKE_CURRENT_SOURCE_DIR}/tests/Learner_unittest.cpp")
+target_link_libraries(Learner_unittest
+ gtest
+ gtest_main
+ quickstep_queryexecution_Learner
+ quickstep_queryoptimizer_QueryHandle)
+add_test(Learner_unittest Learner_unittest)
+
add_executable(ProbabilityStore_unittest
"${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
target_link_libraries(ProbabilityStore_unittest
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 72c68f0..5d877b4 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -188,8 +188,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
priority_levels.emplace_back(priority_iter->first);
numerators.emplace_back(priority_iter->first);
}
- probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
- priority_levels, numerators, sum_priority_levels);
+ if (sum_priority_levels > 0) {
+ probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+ priority_levels, numerators, sum_priority_levels);
+ }
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 64120a7..9d51877 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -35,10 +35,10 @@
namespace quickstep {
-DEFINE_int32(max_past_entries_learner,
+/*DECLARE_int32(max_past_entries_learner,
10,
"The maximum number of past WorkOrder execution statistics"
- " entries for a query");
+ " entries for a query");*/
/** \addtogroup QueryExecution
* @{
*/
@@ -49,6 +49,7 @@ class Learner {
* @brief Constructor.
**/
Learner() {
+ probabilities_of_priority_levels_.reset(new ProbabilityStore());
}
void addCompletionFeedback(
@@ -67,7 +68,15 @@ class Learner {
const std::size_t priority_level = getQueryPriority(query_id);
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
- current_probabilities_[priority_level]->removeObject(query_id);
+ DCHECK(current_probabilities_.find(priority_level) !=
+ current_probabilities_.end());
+ if (current_probabilities_[priority_level]->hasObject(query_id)) {
+ // We may have cases when a query doesn't produce any feedback message,
+ // therefore we may not have an entry for this query in the
+ // current_probabilities_[priority_level] ProbabilityStore.
+ current_probabilities_[priority_level]->removeObject(query_id);
+ }
+ query_id_to_priority_lookup_.erase(query_id);
checkAndRemovePriorityLevel(priority_level);
relearn();
}
@@ -93,6 +102,10 @@ class Learner {
// at this point.
void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+ inline const std::size_t hasActiveQueries() const {
+ return !query_id_to_priority_lookup_.empty();
+ }
+
private:
/**
* @brief Initialize the default probabilities for the queries.
@@ -111,12 +124,15 @@ class Learner {
**/
inline void initializePriorityLevelIfNotPresent(
const std::size_t priority_level) {
- if (isPriorityLevelPresent(priority_level)) {
+ if (!isPriorityLevelPresent(priority_level)) {
current_probabilities_[priority_level].reset(new ProbabilityStore());
// Calculate the default probability for the priority level here and use
// it instead of 0.5 here.
// TODO(harshad) - Correct this.
- probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+ /*const float new_denominator =
+ probabilities_of_priority_levels_[priority_level]->getDenominator();
+ probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
+ priority_level, priority_level, new_denominator);*/
execution_stats_[priority_level];
}
}
@@ -168,7 +184,8 @@ class Learner {
execution_stats_[priority_level].emplace_back(
query_id,
std::unique_ptr<ExecutionStats>(
- new ExecutionStats(FLAGS_max_past_entries_learner)));
+ // new ExecutionStats(FLAGS_max_past_entries_learner)));
+ new ExecutionStats(10)));
// As we are initializing the query, we obviously haven't gotten any
// feedback message for this query. Hence mark the following field as false.
has_feedback_from_all_queries_[priority_level] = false;
@@ -251,10 +268,6 @@ class Learner {
has_feedback_from_all_queries_[priority_level] = true;
}
- inline const std::size_t hasActiveQueries() const {
- return !query_id_to_priority_lookup_.empty();
- }
-
/**
* @brief Get the mean work order execution times for all the queries in
* a given priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index d31caa6..347df89 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -55,6 +55,11 @@ class ProbabilityStore {
return common_denominator_;
}
+ inline bool hasObject(const std::size_t property) const {
+ auto it = individual_probabilities_.find(property);
+ return (it != individual_probabilities_.end());
+ }
+
/**
* @brief Add individual (not cumulative) probability for a given object.
*
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
new file mode 100644
index 0000000..cab241a
--- /dev/null
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -0,0 +1,55 @@
+/**
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+#include <memory>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/Learner.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+namespace quickstep {
+
+TEST(LearnerTest, AddQueryTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle;
+ handle.reset(new QueryHandle(1, 1));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ learner.addQuery(*handle);
+ EXPECT_TRUE(learner.hasActiveQueries());
+}
+
+TEST(LearnerTest, RemoveQueryTest) {
+ Learner learner;
+ std::unique_ptr<QueryHandle> handle;
+ handle.reset(new QueryHandle(1, 1));
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ learner.addQuery(*handle);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_FALSE(learner.hasActiveQueries());
+
+ handle.reset(new QueryHandle(2, 1));
+ learner.addQuery(*handle);
+ EXPECT_TRUE(learner.hasActiveQueries());
+ learner.removeQuery(handle->query_id());
+ EXPECT_FALSE(learner.hasActiveQueries());
+}
+
+} // namespace quickstep
[07/11] incubator-quickstep git commit: Fixed getDenominator method.
More unit tests.
Posted by hb...@apache.org.
Fixed getDenominator method. More unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2a0def65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2a0def65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2a0def65
Branch: refs/heads/scheduler++
Commit: 2a0def654a6d4f60363b41eea6d4755c250f977e
Parents: 5514e00
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jun 25 09:35:10 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500
----------------------------------------------------------------------
query_execution/ExecutionStats.hpp | 15 +-
query_execution/Learner.cpp | 46 +++--
query_execution/Learner.hpp | 147 ++++++++++++--
query_execution/ProbabilityStore.hpp | 15 +-
query_execution/QueryExecutionTypedefs.hpp | 1 +
query_execution/tests/Learner_unittest.cpp | 259 +++++++++++++++++++++++-
6 files changed, 440 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 769c7a4..5643749 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,17 +58,17 @@ class ExecutionStats {
}
/**
- * @brief Check if there are any stats present.
+ * @brief Check if there are stats present for at least one active operator.
**/
inline bool hasStats() const {
for (auto it = active_operators_.begin();
it != active_operators_.end();
++it) {
- if (!it->second->hasStatsForOperator()) {
- return false;
+ if (it->second->hasStatsForOperator()) {
+ return true;
}
}
- return true;
+ return false;
}
/**
@@ -109,14 +109,13 @@ class ExecutionStats {
* @param operator_index The operator index which the value belongs to.
**/
void addEntry(std::size_t value, std::size_t operator_index) {
- if (hasOperator(operator_index)) {
- // This is not the first entry for the given operator.
- active_operators_[operator_index]->addEntry(value);
- } else {
+ if (!hasOperator(operator_index)) {
+ // This is the first entry for the given operator.
// Create the OperatorStats object for this operator.
active_operators_[operator_index] =
std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
}
+ active_operators_[operator_index]->addEntry(value);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 720df33..11c3735 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,7 +26,6 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
@@ -76,10 +75,20 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
DCHECK(current_probabilities_[priority_level] != nullptr);
// As we want the probability of the lone query in this priority level as
// 1, we set the numerator same as denominator.
- const std::size_t numerator =
- current_probabilities_[priority_level]->getDenominator();
- current_probabilities_[priority_level]->addOrUpdateObject(query_id,
- numerator);
+ // TODO(harshad) - Get the mean work order times here too and use that as
+ // the numerator.
+ ExecutionStats *stats = getExecutionStats(query_id);
+ auto query_stats = stats->getCurrentStats();
+ /*const std::size_t numerator =
+ current_probabilities_[priority_level]->getDenominator();*/
+ if (query_stats.second != 0) {
+ const float mean_workorder_time =
+ query_stats.first / static_cast<float>(query_stats.second);
+ if (mean_workorder_time != 0) {
+ current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+ query_id, 1 / mean_workorder_time, 1 / mean_workorder_time);
+ }
+ }
return;
}
// Else, there are more than one queries for the given priority level.
@@ -92,18 +101,25 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
// queries.
DCHECK(mean_workorders_per_query.find(query_id) !=
mean_workorders_per_query.end());
+ DCHECK_NE(mean_workorders_per_query[query_id], 0);
current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
- query_id, mean_workorders_per_query[query_id], denominator);
+ query_id,
+ 1 / static_cast<float>(mean_workorders_per_query[query_id]),
+ denominator);
+ // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
} else {
// At least one of the queries has predicted time for next work order as 0.
// In such a case, we don't update the probabilities and continue to use
// the older probabilities.
+ // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
+ return;
}
}
void Learner::updateProbabilitiesOfAllPriorityLevels() {
- if (!hasFeedbackFromAllPriorityLevels() ||
- has_feedback_from_all_queries_.empty()) {
+ if (!hasFeedbackFromAllPriorityLevels()) {
+ // has_feedback_from_all_queries_.empty()) {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
// Either we don't have enough feedback messages from all the priority
// levels OR there are no active queries in the system.
return;
@@ -111,9 +127,11 @@ void Learner::updateProbabilitiesOfAllPriorityLevels() {
// Compute the predicted work order execution times for all the level.
std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
std::size_t sum_active_priorities = 0;
- for (auto priority_iter : has_feedback_from_all_queries_) {
+ for (auto priority_iter = execution_stats_.begin();
+ priority_iter != execution_stats_.end();
+ ++priority_iter) {
std::size_t total_time_curr_level = 0;
- const std::size_t curr_priority_level = priority_iter.first;
+ const std::size_t curr_priority_level = priority_iter->first;
sum_active_priorities += curr_priority_level;
// For each query, find its predicted work order execution time.
const std::unordered_map<std::size_t, std::size_t>
@@ -194,6 +212,7 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
for (auto priority_iter = execution_stats_.cbegin();
priority_iter != execution_stats_.cend();
++priority_iter) {
+ DCHECK(!priority_iter->second.empty());
const std::size_t curr_priority_level = priority_iter->first;
sum_priority_levels += curr_priority_level;
priority_levels.emplace_back(curr_priority_level);
@@ -217,7 +236,8 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
new ExecutionStats(FLAGS_max_past_entries_learner)));
// As we are initializing the query, we obviously haven't gotten any
// feedback message for this query. Hence mark the following field as false.
- has_feedback_from_all_queries_[priority_level] = false;
+ // has_feedback_from_all_queries_[priority_level] = false;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
}
void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
@@ -226,7 +246,9 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
execution_stats_.erase(priority_level);
current_probabilities_.erase(priority_level);
probabilities_of_priority_levels_->removeObject(priority_level);
- has_feedback_from_all_queries_.erase(priority_level);
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_.erase(priority_level);
+ // LOG(INFO) << "Removed priority level: " << priority_level;
if (hasActiveQueries()) {
if (static_cast<int>(priority_level) == highest_priority_level_) {
// The priority level to be removed is the highest priority level.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index f99b1c6..2c6fdef 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -27,11 +27,14 @@
#include "query_execution/ExecutionStats.hpp"
#include "query_execution/ProbabilityStore.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "utility/Macros.hpp"
#include "glog/logging.h"
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
namespace quickstep {
/** \addtogroup QueryExecution
@@ -49,16 +52,26 @@ class Learner {
const serialization::NormalWorkOrderCompletionMessage
&workorder_completion_proto);
+ /**
+ * @brief Add a query to the Learner.
+ *
+ * @param query_handle The query handle for the new query.
+ **/
void addQuery(const QueryHandle &query_handle) {
initializePriorityLevelIfNotPresent(query_handle.query_priority());
initializeQuery(query_handle);
relearn();
}
+ /**
+ * @brief Remove a query from the Learner.
+ *
+ * @param query_id The ID of the query to be removed.
+ **/
void removeQuery(const std::size_t query_id) {
- // Find the iterator to the query in execution_stats_.
DCHECK(isQueryPresent(query_id));
const std::size_t priority_level = getQueryPriority(query_id);
+ // Find the iterator to the query in execution_stats_.
auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
execution_stats_[priority_level].erase(stats_iter_mutable);
DCHECK(current_probabilities_.find(priority_level) !=
@@ -69,17 +82,25 @@ class Learner {
// current_probabilities_[priority_level] ProbabilityStore.
current_probabilities_[priority_level]->removeObject(query_id);
}
+ // has_feedback_from_all_queries_[priority_level] = false;
query_id_to_priority_lookup_.erase(query_id);
checkAndRemovePriorityLevel(priority_level);
relearn();
}
- void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+ /**
+ * @brief Remove the stats of a given operator in a given query.
+ **/
+ void removeOperator(const std::size_t query_id,
+ const std::size_t operator_id) {
ExecutionStats *stats = getExecutionStats(query_id);
DCHECK(stats != nullptr);
stats->removeOperator(operator_id);
}
+ /**
+ * @brief Reset the probabilities and start learning again.
+ **/
void relearn() {
if (hasActiveQueries()) {
initializeDefaultProbabilitiesForAllQueries();
@@ -87,10 +108,17 @@ class Learner {
}
}
+ /**
+ * @brief Check if there are any active queries in the Learner.
+ **/
inline const bool hasActiveQueries() const {
return !query_id_to_priority_lookup_.empty();
}
+ /**
+ * @brief Get the number of active queries in the Learner for the given
+ * priority level.
+ **/
inline const std::size_t getNumActiveQueriesInPriorityLevel(
const std::size_t priority_level) const {
const auto it = execution_stats_.find(priority_level);
@@ -101,6 +129,9 @@ class Learner {
}
}
+ /**
+ * @brief Get the total number of active queries in the Learner.
+ **/
inline const std::size_t getTotalNumActiveQueries() const {
return query_id_to_priority_lookup_.size();
}
@@ -115,6 +146,83 @@ class Learner {
return highest_priority_level_;
}
+ /**
+ * @brief Randomly pick a priority level.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A priority level. If no queries are present in the learner, return
+ * kInvalidPriorityLevel.
+ **/
+ inline const int pickRandomPriorityLevel() const {
+ if (hasActiveQueries()) {
+ const int result = static_cast<int>(
+ probabilities_of_priority_levels_->pickRandomProperty());
+ /*LOG(INFO) << "Random priority level: " << result << " has "
+ << current_probabilities_.find(result)->second->getNumObjects()
+ << " queries";*/
+ return result;
+ } else {
+ return kInvalidPriorityLevel;
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from any priority level in the learner.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present in the learner, return
+ * kInvalidQueryID.
+ **/
+ inline const int pickRandomQuery() const {
+ if (hasActiveQueries()) {
+ const int random_priority_level = pickRandomPriorityLevel();
+ // Note : The valid priority level values are non-zero.
+ DCHECK_GT(random_priority_level, 0);
+ const int result = pickRandomQueryFromPriorityLevel(
+ static_cast<std::size_t>(random_priority_level));
+ // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
+ return result;
+ } else {
+ // LOG(INFO) << "No active query right now";
+ return kInvalidQueryID;
+ }
+ }
+
+ /**
+ * @brief Randomly pick a query from a given priority level in the learner.
+ *
+ * @note We use uniform random distribution.
+ *
+ * @return A query ID. If no queries are present for this priority level in
+ * the learner, return kInvalidQueryID.
+ **/
+ inline const int pickRandomQueryFromPriorityLevel(
+ const std::size_t priority_level) const {
+ DCHECK(isPriorityLevelPresent(priority_level));
+ if (hasActiveQueries()) {
+ if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+ DCHECK(current_probabilities_.at(priority_level) != nullptr);
+ const auto it = current_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ current_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ // LOG(INFO) << "No queries in priority level: " << priority_level;
+ } else {
+ DCHECK(default_probabilities_.at(priority_level) != nullptr);
+ const auto it = default_probabilities_.find(priority_level);
+ if (it->second->getNumObjects() > 0) {
+ return static_cast<int>(
+ default_probabilities_.at(priority_level)->pickRandomProperty());
+ }
+ // LOG(INFO) << "No queries in priority level: " << priority_level;
+ }
+ }
+ return kInvalidQueryID;
+ }
+
private:
/**
* @brief Update the probabilities for queries in the given priority level.
@@ -261,6 +369,8 @@ class Learner {
**/
inline bool hasFeedbackFromAllQueriesInPriorityLevel(
const std::size_t priority_level) const {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // return has_feedback_from_all_queries_.at(priority_level);
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
&stats_vector = execution_stats_.at(priority_level);
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
@@ -275,16 +385,19 @@ class Learner {
inline void updateFeedbackFromQueriesInPriorityLevel(
const std::size_t priority_level) {
const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
- &stats_vector = execution_stats_.at(priority_level);
+ &stats_vector = execution_stats_[priority_level];
for (std::size_t i = 0; i < stats_vector.size(); ++i) {
DCHECK(stats_vector[i].second != nullptr);
if (!stats_vector[i].second->hasStats()) {
// At least one query has no statistics so far.
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_[priority_level] = false;
return;
}
}
// All the queries have at least one execution statistic.
- has_feedback_from_all_queries_[priority_level] = true;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ // has_feedback_from_all_queries_[priority_level] = true;
}
/**
@@ -313,31 +426,36 @@ class Learner {
}
/**
- * @param mean_workorder_per_query A vector of pairs in which:
- * 1st element is mean time per work order
- * 2nd element is the query ID.
+ * @param mean_workorder_per_query An unordered_map in which:
+ * 1st element is the query ID.
+ * 2nd element is mean time per work order
*
* @note If any query has mean work order time as 0, we return 0 as the
* denominator.
*
* @return The denominator to be used for probability calculations.
**/
- inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
- &mean_workorder_per_query) const {
+ inline float calculateDenominator(
+ const std::unordered_map<std::size_t, std::size_t>
+ &mean_workorder_per_query) const {
float denominator = 0;
for (const auto &element : mean_workorder_per_query) {
if (element.second != 0) {
denominator += 1/static_cast<float>(element.second);
- } else {
- return 0;
+ /*} else {
+ return 0;*/
}
}
return denominator;
}
inline bool hasFeedbackFromAllPriorityLevels() const {
- for (auto feedback : has_feedback_from_all_queries_) {
- if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+ // for (auto feedback : has_feedback_from_all_queries_) {
+ // NOTE(harshad) : Not using this cache as it gets confusing.
+ for (auto priority_iter = default_probabilities_.cbegin();
+ priority_iter != default_probabilities_.cend();
+ ++priority_iter) {
+ if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_iter->first)) {
return false;
}
}
@@ -369,9 +487,10 @@ class Learner {
// ProbabilityStrore for probabilities mapped to the priority levels.
std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+ // NOTE(harshad) : Not using this cache as it gets confusing.
// Key = priority level. Value = A boolean that indicates if we have received
// feedback from all the queries in the given priority level.
- std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+ // std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
int highest_priority_level_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 233dd2e..ed52f75 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -41,7 +41,7 @@ class ProbabilityStore {
* @brief Constructor.
**/
ProbabilityStore()
- : common_denominator_(1.0), mt_(std::random_device()()) {}
+ : common_denominator_(1.0) {}
/**
* @brief Get the number of objects in the store.
@@ -221,11 +221,16 @@ class ProbabilityStore {
/**
* @brief Return a randomly chosen property.
*
+ * TODO(harshad) - If it is expensive to create the random device
+ * on every invocation of this function, make it a class variable.
+ * In which case, we won't be able to mark the function as const.
+ *
* @note The random number is uniformly chosen.
**/
- inline const std::size_t pickRandomProperty() {
+ inline const std::size_t pickRandomProperty() const {
std::uniform_real_distribution<float> dist(0.0, 1.0);
- const float chosen_probability = dist(mt_);
+ std::random_device rd;
+ const float chosen_probability = dist(rd);
return getPropertyForProbability(chosen_probability);
}
@@ -260,7 +265,7 @@ class ProbabilityStore {
* or equal to the input cumulative probability.
**/
inline const std::size_t getPropertyForProbability(
- const float key_cumulative_probability) {
+ const float key_cumulative_probability) const {
DCHECK(!cumulative_probabilities_.empty());
// It doesn't matter in which order the objects are arranged in the
// cumulative_probabilities_ vector.
@@ -296,8 +301,6 @@ class ProbabilityStore {
float common_denominator_;
- std::mt19937_64 mt_;
-
DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index e13f3e0..feaa285 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -44,6 +44,7 @@ typedef tmb::client_id client_id;
typedef tmb::message_type_id message_type_id;
const int kInvalidPriorityLevel = -1;
+const int kInvalidQueryID = -1;
using ClientIDMap = ThreadIDBasedMap<client_id,
'C',
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 107576f..7d67b1b 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -220,15 +220,16 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
// Randomize the orders.
std::random_device rd;
- std::mt19937 g(rd());
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
std::shuffle(priorities_insertion_order.begin(),
priorities_insertion_order.end(),
- g);
+ g1);
std::shuffle(priorities_removal_order.begin(),
priorities_removal_order.end(),
- g);
+ g2);
Learner learner;
EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
@@ -263,4 +264,256 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
}
+TEST_F(LearnerTest, PickRandomPriorityLevelTest) {
+ std::vector<std::size_t> priorities_insertion_order;
+ std::vector<std::size_t> priorities_removal_order;
+ const std::size_t kNumPrioritiesToTest = 20;
+ for (std::size_t priority_num = 1;
+ priority_num <= kNumPrioritiesToTest;
+ ++priority_num) {
+ // Note: Priority level should be non-zero, hence we begin from 1.
+ priorities_insertion_order.emplace_back(priority_num);
+ priorities_removal_order.emplace_back(priority_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ g1);
+
+ std::shuffle(priorities_removal_order.begin(),
+ priorities_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+
+ std::unique_ptr<QueryHandle> handle;
+ // First insert the queries in the order of priorities as defined by
+ // priorities_insertion_order.
+ for (auto it = priorities_insertion_order.begin();
+ it != priorities_insertion_order.end();
+ ++it) {
+ // Note that the query ID is kept the same as priority level for simplicity.
+ handle.reset(new QueryHandle(*it, *it));
+ learner.addQuery(*handle);
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_insertion_order vector.
+ auto find_priority_level_it = std::find(
+ priorities_insertion_order.begin(), it + 1, picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+ }
+
+ // Repeat the tests a few more times.
+ const std::size_t kNumTests = 200;
+ for (std::size_t test_num = 0; test_num < kNumTests; ++test_num) {
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_insertion_order vector.
+ auto find_priority_level_it = std::find(priorities_insertion_order.begin(),
+ priorities_insertion_order.end(),
+ picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+ }
+
+ // Now remove the queries in the order of priorities as defined by
+ // priorities_removal_order.
+ for (auto it = priorities_removal_order.begin();
+ it != priorities_removal_order.end();
+ ++it) {
+ // Recall that the query ID is the same as priority level.
+ const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+ // Try to find the randomly picked priority level in the
+ // priorities_removal_order vector.
+ auto find_priority_level_it = std::find(
+ it, priorities_removal_order.end(), picked_priority_level);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_priority_level_it != priorities_removal_order.end());
+ learner.removeQuery(*it);
+ }
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+}
+
+TEST_F(LearnerTest, PickRandomQueryDefaultProbabilitiesTest) {
+ // We use a set of unique query IDs. For each query ID, we assign a priority
+ // level. The set of priority levels is smaller than the set of query IDs, so
+ // that we can have more than one queries for a given priority level.
+
+ // Also, in this test we don't send any completion feedback message to the
+ // learner. Therefore it always refers to the default probabilities set for
+ // the queries.
+ std::vector<std::size_t> query_ids_insertion_order;
+ std::vector<std::size_t> query_ids_removal_order;
+ const std::size_t kNumQueriesToTest = 20;
+ for (std::size_t query_num = 0;
+ query_num < kNumQueriesToTest;
+ ++query_num) {
+ query_ids_insertion_order.emplace_back(query_num);
+ query_ids_removal_order.emplace_back(query_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ g1);
+
+ std::shuffle(query_ids_removal_order.begin(),
+ query_ids_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+ std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+ std::size_t priority_level_index = 0;
+ std::unique_ptr<QueryHandle> handle;
+ // Insert the queries in the order as defined in query_ids_insertion_order.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+ priority_level_index = (priority_level_index + 1) % priority_levels.size();
+ learner.addQuery(*handle);
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(
+ query_ids_insertion_order.begin(), it + 1, picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Repeat the tests a few more times.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Remove the queries in the order as defined in query_ids_removal_order.
+ for (auto it = query_ids_removal_order.begin();
+ it != query_ids_removal_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_removal_order.
+ auto find_query_it = std::find(
+ it, query_ids_removal_order.end(), picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+ learner.removeQuery(*it);
+ }
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
+
+TEST_F(LearnerTest, PickRandomQueryCurrentProbabilitiesTest) {
+ // We use a set of unique query IDs. For each query ID, we assign a priority
+ // level. The set of priority levels is smaller than the set of query IDs, so
+ // that we can have more than one queries for a given priority level.
+
+ // In this test we send completion feedback messages for all the queries
+ // to the learner. Therefore it refers to the current probabilities set for
+ // the queries.
+ std::vector<std::size_t> query_ids_insertion_order;
+ std::vector<std::size_t> query_ids_removal_order;
+ const std::size_t kNumQueriesToTest = 20;
+ for (std::size_t query_num = 0;
+ query_num < kNumQueriesToTest;
+ ++query_num) {
+ query_ids_insertion_order.emplace_back(query_num);
+ query_ids_removal_order.emplace_back(query_num);
+ }
+
+ // Randomize the orders.
+ std::random_device rd;
+ std::mt19937 g1(rd());
+ std::mt19937 g2(rd());
+
+ std::shuffle(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ g1);
+
+ std::shuffle(query_ids_removal_order.begin(),
+ query_ids_removal_order.end(),
+ g2);
+
+ Learner learner;
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+ std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+ std::size_t priority_level_index = 0;
+ std::unique_ptr<QueryHandle> handle;
+ // Insert the queries in the order as defined in query_ids_insertion_order.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+ priority_level_index = (priority_level_index + 1) % priority_levels.size();
+ learner.addQuery(*handle);
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(
+ query_ids_insertion_order.begin(), it + 1, picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Now send one completion feedback message per query to the learner.
+ const std::size_t kOperatorID = 0;
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ // LOG(INFO) << "Completion message for query : " << *it;
+ learner.addCompletionFeedback(createMockCompletionMessage(*it, kOperatorID));
+ }
+
+ // Repeat the tests a few more times.
+ for (auto it = query_ids_insertion_order.begin();
+ it != query_ids_insertion_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_insertion_order.
+ auto find_query_it = std::find(query_ids_insertion_order.begin(),
+ query_ids_insertion_order.end(),
+ picked_query_id);
+ // We expect the search to be successful.
+ EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+ }
+
+ // Remove the queries in the order as defined in query_ids_removal_order.
+ for (auto it = query_ids_removal_order.begin();
+ it != query_ids_removal_order.end();
+ ++it) {
+ const int picked_query_id = learner.pickRandomQuery();
+ // Try to find the randomly picked query ID in query_ids_removal_order.
+ auto find_query_it = std::find(
+ it, query_ids_removal_order.end(), picked_query_id);
+ // We expect the search to be successful.
+ // LOG(INFO) << "Picked query ID: " << picked_query_id << "\n";
+ EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+ learner.removeQuery(*it);
+ // LOG(INFO) << "Removed query ID: " << *it;
+ }
+
+ EXPECT_FALSE(learner.hasActiveQueries());
+ EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
} // namespace quickstep