You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/25 14:40:34 UTC

[01/11] incubator-quickstep git commit: Added Window Aggregation Function in optimizer. [Forced Update!]

Repository: incubator-quickstep
Updated Branches:
  refs/heads/scheduler++ 688eb25c5 -> 2a0def654 (forced update)


Added Window Aggregation Function in optimizer.

  - The resolver could understand optional window clause w/ aggregation functions.
  - Only one window aggregation function is allowed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/714874ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/714874ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/714874ce

Branch: refs/heads/scheduler++
Commit: 714874ce54e12972285a43f92784ef6954a8b6fd
Parents: d642891
Author: shixuan <sh...@wisc.edu>
Authored: Tue Jun 21 20:08:52 2016 +0000
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Jun 24 18:59:47 2016 -0700

----------------------------------------------------------------------
 query_optimizer/expressions/CMakeLists.txt      |  17 +-
 query_optimizer/expressions/ExpressionType.hpp  |   3 +-
 query_optimizer/expressions/PatternMatcher.hpp  |   3 +
 .../expressions/WindowAggregateFunction.cpp     | 193 ++++++++++++
 .../expressions/WindowAggregateFunction.hpp     | 246 +++++++++++++++
 query_optimizer/logical/CMakeLists.txt          |  18 +-
 query_optimizer/logical/LogicalType.hpp         |   3 +-
 query_optimizer/logical/PatternMatcher.hpp      |   2 +
 query_optimizer/logical/WindowAggregate.cpp     |  85 +++++
 query_optimizer/logical/WindowAggregate.hpp     | 123 ++++++++
 query_optimizer/resolver/CMakeLists.txt         |   2 +
 query_optimizer/resolver/Resolver.cpp           | 314 ++++++++++++++++++-
 query_optimizer/resolver/Resolver.hpp           |  66 +++-
 query_optimizer/strategy/CMakeLists.txt         |   3 +-
 query_optimizer/strategy/OneToOne.cpp           |   5 +
 .../tests/logical_generator/Select.test         | 162 ++++++++++
 query_optimizer/tests/resolver/Select.test      | 162 ++++++++++
 17 files changed, 1387 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 6c40741..08d7df5 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -43,6 +43,7 @@ add_library(quickstep_queryoptimizer_expressions_SearchedCase SearchedCase.cpp S
 add_library(quickstep_queryoptimizer_expressions_SimpleCase SimpleCase.cpp SimpleCase.hpp)
 add_library(quickstep_queryoptimizer_expressions_SubqueryExpression SubqueryExpression.cpp SubqueryExpression.hpp)
 add_library(quickstep_queryoptimizer_expressions_UnaryExpression UnaryExpression.cpp UnaryExpression.hpp)
+add_library(quickstep_queryoptimizer_expressions_WindowAggregateFunction WindowAggregateFunction.cpp WindowAggregateFunction.hpp)
 
 # Link dependencies:
 target_link_libraries(quickstep_queryoptimizer_expressions_AggregateFunction
@@ -301,6 +302,19 @@ target_link_libraries(quickstep_queryoptimizer_expressions_UnaryExpression
                       quickstep_types_operations_unaryoperations_UnaryOperation
                       quickstep_types_operations_unaryoperations_UnaryOperationID
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
+                      glog
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
+                      quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Scalar
+                      quickstep_types_Type
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
+
 
 # Module all-in-one library:
 add_library(quickstep_queryoptimizer_expressions ../../empty_src.cpp OptimizerExpressionsModule.hpp)
@@ -330,4 +344,5 @@ target_link_libraries(quickstep_queryoptimizer_expressions
                       quickstep_queryoptimizer_expressions_SearchedCase
                       quickstep_queryoptimizer_expressions_SimpleCase
                       quickstep_queryoptimizer_expressions_SubqueryExpression
-                      quickstep_queryoptimizer_expressions_UnaryExpression)
+                      quickstep_queryoptimizer_expressions_UnaryExpression
+                      quickstep_queryoptimizer_expressions_WindowAggregateFunction)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/ExpressionType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/ExpressionType.hpp b/query_optimizer/expressions/ExpressionType.hpp
index 23770e0..77e0874 100644
--- a/query_optimizer/expressions/ExpressionType.hpp
+++ b/query_optimizer/expressions/ExpressionType.hpp
@@ -49,7 +49,8 @@ enum class ExpressionType {
   kSearchedCase,
   kSimpleCase,
   kSubqueryExpression,
-  kUnaryExpression
+  kUnaryExpression,
+  kWindowAggregateFunction
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/PatternMatcher.hpp b/query_optimizer/expressions/PatternMatcher.hpp
index 87bc52a..2cc91d6 100644
--- a/query_optimizer/expressions/PatternMatcher.hpp
+++ b/query_optimizer/expressions/PatternMatcher.hpp
@@ -52,6 +52,7 @@ class Scalar;
 class ScalarLiteral;
 class Sum;
 class UnaryExpression;
+class WindowAggregateFunction;
 
 /** \addtogroup OptimizerExpressions
  *  @{
@@ -155,6 +156,8 @@ using SomeScalar = SomeExpressionNode<Scalar,
                                       ExpressionType::kUnaryExpression>;
 using SomeScalarLiteral = SomeExpressionNode<ScalarLiteral, ExpressionType::kScalarLiteral>;
 using SomeUnaryExpression = SomeExpressionNode<UnaryExpression, ExpressionType::kUnaryExpression>;
+using SomeWindowAggregateFunction = SomeExpressionNode<WindowAggregateFunction,
+                                                       ExpressionType::kWindowAggregateFunction>;
 
 using SomeAggregateFunction = SomeExpressionNode<AggregateFunction,
                                                  ExpressionType::kAggregateFunction>;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/WindowAggregateFunction.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.cpp b/query_optimizer/expressions/WindowAggregateFunction.cpp
new file mode 100644
index 0000000..7b1f304
--- /dev/null
+++ b/query_optimizer/expressions/WindowAggregateFunction.cpp
@@ -0,0 +1,193 @@
+/**
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "types/Type.hpp"
+#include "utility/Cast.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace expressions {
+
+bool WindowAggregateFunction::isNullable() const {
+  std::vector<const Type*> argument_types;
+  for (const ScalarPtr &argument : arguments_) {
+    argument_types.emplace_back(&argument->getValueType());
+  }
+
+  const Type *return_type = window_aggregate_.resultTypeForArgumentTypes(argument_types);
+  DCHECK(return_type != nullptr);
+  return return_type->isNullable();
+}
+
+const Type& WindowAggregateFunction::getValueType() const {
+  std::vector<const Type*> argument_types;
+  for (const ScalarPtr &argument : arguments_) {
+    argument_types.emplace_back(&argument->getValueType());
+  }
+
+  const Type *return_type = window_aggregate_.resultTypeForArgumentTypes(argument_types);
+  DCHECK(return_type != nullptr);
+  return *return_type;
+}
+
+WindowAggregateFunctionPtr WindowAggregateFunction::Create(
+    const ::quickstep::AggregateFunction &window_aggregate,
+    const std::vector<ScalarPtr> &arguments,
+    const WindowInfo &window_info,
+    const std::string &window_name,
+    const bool is_distinct) {
+#ifdef QUICKSTEP_DEBUG
+  std::vector<const Type*> argument_types;
+  for (const ScalarPtr &argument : arguments) {
+    argument_types.emplace_back(&argument->getValueType());
+  }
+  DCHECK(window_aggregate.canApplyToTypes(argument_types));
+#endif  // QUICKSTEP_DEBUG
+
+  return WindowAggregateFunctionPtr(
+      new WindowAggregateFunction(window_aggregate, arguments, window_info, window_name, is_distinct));
+}
+
+ExpressionPtr WindowAggregateFunction::copyWithNewChildren(
+    const std::vector<ExpressionPtr> &new_children) const {
+  std::vector<ScalarPtr> new_arguments;
+  for (const ExpressionPtr &expression_ptr : new_children) {
+    ScalarPtr expr_as_scalar;
+    CHECK(SomeScalar::MatchesWithConditionalCast(expression_ptr, &expr_as_scalar))
+        << expression_ptr->toString();
+    new_arguments.emplace_back(std::move(expr_as_scalar));
+  }
+
+  return WindowAggregateFunctionPtr(
+      new WindowAggregateFunction(window_aggregate_,
+                                  new_arguments,
+                                  window_info_,
+                                  window_name_,
+                                  is_distinct_));
+}
+
+std::vector<AttributeReferencePtr> WindowAggregateFunction::getReferencedAttributes() const {
+  std::vector<AttributeReferencePtr> referenced_attributes;
+  for (const ScalarPtr &argument : arguments_) {
+    const std::vector<AttributeReferencePtr> referenced_attributes_in_argument =
+        argument->getReferencedAttributes();
+    referenced_attributes.insert(referenced_attributes.end(),
+                                 referenced_attributes_in_argument.begin(),
+                                 referenced_attributes_in_argument.end());
+  }
+
+  referenced_attributes.insert(referenced_attributes.end(),
+                               window_info_.partition_by_attributes.begin(),
+                               window_info_.partition_by_attributes.end());
+
+  referenced_attributes.insert(referenced_attributes.end(),
+                               window_info_.order_by_attributes.begin(),
+                               window_info_.order_by_attributes.end());
+
+  return referenced_attributes;
+}
+
+void WindowAggregateFunction::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("function");
+  inline_field_values->push_back(window_aggregate_.getName());
+
+  container_child_field_names->push_back("arguments");
+  container_child_fields->emplace_back(CastSharedPtrVector<OptimizerTreeBase>(arguments_));
+
+  inline_field_names->push_back("window_name");
+  inline_field_values->push_back(window_name_);
+
+  container_child_field_names->push_back("partition_by");
+  container_child_fields->emplace_back(
+      CastSharedPtrVector<OptimizerTreeBase>(window_info_.partition_by_attributes));
+
+  container_child_field_names->push_back("order_by");
+  container_child_fields->emplace_back(
+      CastSharedPtrVector<OptimizerTreeBase>(window_info_.order_by_attributes));
+
+  inline_field_names->push_back("is_ascending");
+  std::string ascending_list("[");
+  for (const bool is_ascending : window_info_.order_by_directions) {
+    if (is_ascending) {
+      ascending_list.append("true,");
+    } else {
+      ascending_list.append("false,");
+    }
+  }
+  if (!window_info_.order_by_directions.empty()) {
+    ascending_list.pop_back();
+  }
+  ascending_list.append("]");
+  inline_field_values->push_back(ascending_list);
+
+  inline_field_names->push_back("nulls_first");
+  std::string nulls_first_flags("[");
+  for (const bool nulls_first_flag : window_info_.nulls_first) {
+    if (nulls_first_flag) {
+      nulls_first_flags.append("true,");
+    } else {
+      nulls_first_flags.append("false,");
+    }
+  }
+  if (!window_info_.nulls_first.empty()) {
+    nulls_first_flags.pop_back();
+  }
+  nulls_first_flags.append("]");
+  inline_field_values->push_back(nulls_first_flags);
+
+  if (window_info_.frame_info != nullptr) {
+    const WindowFrameInfo *frame_info = window_info_.frame_info;
+
+    inline_field_names->push_back("frame_mode");
+    inline_field_values->push_back(frame_info->is_row ? "row" : "range");
+
+    inline_field_names->push_back("num_preceding");
+    inline_field_values->push_back(std::to_string(frame_info->num_preceding));
+
+    inline_field_names->push_back("num_following");
+    inline_field_values->push_back(std::to_string(frame_info->num_following));
+  }
+
+  if (is_distinct_) {
+    inline_field_names->push_back("is_distinct");
+    inline_field_values->push_back("true");
+  }
+}
+
+}  // namespace expressions
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/expressions/WindowAggregateFunction.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/WindowAggregateFunction.hpp b/query_optimizer/expressions/WindowAggregateFunction.hpp
new file mode 100644
index 0000000..0bee28f
--- /dev/null
+++ b/query_optimizer/expressions/WindowAggregateFunction.hpp
@@ -0,0 +1,246 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_WINDOW_AGGREGATE_FUNCTION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_WINDOW_AGGREGATE_FUNCTION_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class AggregateFunction;
+class Type;
+
+namespace optimizer {
+namespace expressions {
+
+/** \addtogroup OptimizerExpressions
+ *  @{
+ */
+
+struct WindowFrameInfo {
+  /**
+   * @brief Cosntructor.
+   *
+   * @param is_row_in True if this window frame is defined by ROWS, false if
+   *                  defined by RANGE.
+   * @param num_preceding_in The number of preceding tuples the window frame
+   *                         will cover, -1 means UNBOUNDED.
+   * @param num_following_in The number of following tuples the window frame
+   *                         will cover, -1 means UNBOUNDED.
+   **/
+  WindowFrameInfo(const bool is_row_in,
+                  const int num_preceding_in,
+                  const int num_following_in)
+      : is_row(is_row_in),
+        num_preceding(num_preceding_in),
+        num_following(num_following_in) {}
+
+  const bool is_row;
+  const int num_preceding;
+  const int num_following;
+};
+
+struct WindowInfo {
+  /**
+   * @brief Constructor.
+   *
+   * @param partition_by_attributes_in The partition keys for the window.
+   * @param order_by_attributes_in The order keys for the window.
+   * @param order_by_directions_in The order direction for order key.
+   * @param nulls_first_in The nulls' position for order key.
+   * @param frame_info_in The window frame information for the window. Null
+   *        means there is no explicit window frame definition.
+   **/
+  WindowInfo(const std::vector<AttributeReferencePtr> &partition_by_attributes_in,
+             const std::vector<AttributeReferencePtr> &order_by_attributes_in,
+             const std::vector<bool> &order_by_directions_in,
+             const std::vector<bool> &nulls_first_in,
+             const WindowFrameInfo *frame_info_in)
+      : partition_by_attributes(partition_by_attributes_in),
+        order_by_attributes(order_by_attributes_in),
+        order_by_directions(order_by_directions_in),
+        nulls_first(nulls_first_in),
+        frame_info(frame_info_in) {}
+
+  const std::vector<AttributeReferencePtr> partition_by_attributes;
+  const std::vector<AttributeReferencePtr> order_by_attributes;
+  const std::vector<bool> order_by_directions;
+  const std::vector<bool> nulls_first;
+  const WindowFrameInfo *frame_info;
+};
+
+class WindowAggregateFunction;
+typedef std::shared_ptr<const WindowAggregateFunction> WindowAggregateFunctionPtr;
+
+/**
+ * @brief Represents a window aggregate function and its arguments in the
+ *        optimizer. This class wraps some of the functionality from
+ *        quickstep::AggregateFunction and represents a particular instance
+ *        of an aggregate during query optimization.
+ **/
+class WindowAggregateFunction : public Expression {
+ public:
+  /**
+   * @brief Destructor.
+   */
+  ~WindowAggregateFunction() override {}
+
+  ExpressionType getExpressionType() const override {
+    return ExpressionType::kWindowAggregateFunction;
+  }
+
+  std::string getName() const override {
+    return "WindowAggregateFunction";
+  }
+
+  const Type& getValueType() const override;
+
+  bool isConstant() const override {
+    // Window aggregate function is never considered as a constant expression.
+    return false;
+  }
+
+  ExpressionPtr copyWithNewChildren(
+      const std::vector<ExpressionPtr> &new_children) const override;
+
+  std::vector<AttributeReferencePtr> getReferencedAttributes() const override;
+
+  /**
+   * @return Whether the type of the return value is nullable.
+   **/
+  bool isNullable() const;
+
+  /**
+   * @return The WindowAggregateFunction singleton (from the expression system)
+   *         for this node.
+   **/
+  inline const ::quickstep::AggregateFunction& window_aggregate() const {
+    return window_aggregate_;
+  }
+
+  /**
+   * @return The list of scalar arguments to this aggregate.
+   **/
+  inline const std::vector<ScalarPtr>& arguments() const {
+    return arguments_;
+  }
+
+  /**
+   * @return The window info of this window aggregate function.
+   **/
+  inline const WindowInfo window_info() const {
+    return window_info_;
+  }
+
+  /**
+   * @return The name of the window.
+   **/
+  inline const std::string window_name() const {
+    return window_name_;
+  }
+
+  /**
+   * @return Whether this is a DISTINCT aggregation.
+   **/
+  inline bool is_distinct() const {
+    return is_distinct_;
+  }
+
+  /**
+   * @brief Create a new WindowAggregateFunction by directly defined window.
+   *
+   * @warning It is an error to call this with arguments that the given
+   *          aggregate can not apply to.
+   *
+   * @param aggregate The underlying WindowAggregateFunction from the expression
+   *        system.
+   * @param arguments A list of arguments to the window aggregate function.
+   * @param window_info The window info of the window aggregate function.
+   * @param is_distinct Whether this is a DISTINCT aggregation.
+   * @return A new AggregateFunctionPtr.
+   **/
+  static WindowAggregateFunctionPtr Create(const ::quickstep::AggregateFunction &window_aggregate,
+                                           const std::vector<ScalarPtr> &arguments,
+                                           const WindowInfo &window_info,
+                                           const std::string &window_name,
+                                           const bool is_distinct);
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  /**
+   * @brief Constructor.
+   *
+   * @param window_aggregate The actual AggregateFunction to use.
+   * @param arguments A list of arguments to the window aggregate function.
+   * @param window_info The window info of the window aggregate function.
+   * @param is_distinct Indicates whether this is a DISTINCT aggregation.
+   */
+  WindowAggregateFunction(const ::quickstep::AggregateFunction &window_aggregate,
+                          const std::vector<ScalarPtr> &arguments,
+                          const WindowInfo &window_info,
+                          const std::string &window_name,
+                          const bool is_distinct)
+      : window_aggregate_(window_aggregate),
+        arguments_(arguments),
+        window_info_(window_info),
+        window_name_(window_name),
+        is_distinct_(is_distinct) {
+    for (const ScalarPtr &child : arguments_) {
+      addChild(child);
+    }
+  }
+
+  // TODO(Shixuan): Currently this class uses AggregationFunction as
+  // window_aggregate_. If it really needs to be seperated from the
+  // AggregationFunction, a new class for WindowAggregationFunction should be
+  // created as quickstep::WindowAggregateFunction.
+  const ::quickstep::AggregateFunction &window_aggregate_;
+  std::vector<ScalarPtr> arguments_;
+  const WindowInfo window_info_;
+  const std::string window_name_;
+  const bool is_distinct_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregateFunction);
+};
+
+/** @} */
+
+}  // namespace expressions
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_WINDOW_AGGREGATE_FUNCTION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/CMakeLists.txt b/query_optimizer/logical/CMakeLists.txt
index 61c6234..b787c60 100644
--- a/query_optimizer/logical/CMakeLists.txt
+++ b/query_optimizer/logical/CMakeLists.txt
@@ -43,6 +43,7 @@ add_library(quickstep_queryoptimizer_logical_TableReference TableReference.cpp T
 add_library(quickstep_queryoptimizer_logical_TableGenerator ../../empty_src.cpp TableGenerator.hpp)
 add_library(quickstep_queryoptimizer_logical_TopLevelPlan TopLevelPlan.cpp TopLevelPlan.hpp)
 add_library(quickstep_queryoptimizer_logical_UpdateTable UpdateTable.cpp UpdateTable.hpp)
+add_library(quickstep_queryoptimizer_logical_WindowAggregate WindowAggregate.cpp WindowAggregate.hpp)
 
 # Link dependencies:
 target_link_libraries(quickstep_queryoptimizer_logical_Aggregate
@@ -259,6 +260,20 @@ target_link_libraries(quickstep_queryoptimizer_logical_UpdateTable
                       quickstep_queryoptimizer_logical_LogicalType
                       quickstep_utility_Cast
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_logical_WindowAggregate
+                      glog
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_logical_Logical
+                      quickstep_queryoptimizer_logical_LogicalType
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
+
 
 # Module all-in-one library:
 add_library(quickstep_queryoptimizer_logical ../../empty_src.cpp OptimizerLogicalModule.hpp)
@@ -287,4 +302,5 @@ target_link_libraries(quickstep_queryoptimizer_logical
                       quickstep_queryoptimizer_logical_TableGenerator
                       quickstep_queryoptimizer_logical_TableReference
                       quickstep_queryoptimizer_logical_TopLevelPlan
-                      quickstep_queryoptimizer_logical_UpdateTable)
+                      quickstep_queryoptimizer_logical_UpdateTable
+                      quickstep_queryoptimizer_logical_WindowAggregate)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/LogicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/LogicalType.hpp b/query_optimizer/logical/LogicalType.hpp
index 1b9366e..c82fb47 100644
--- a/query_optimizer/logical/LogicalType.hpp
+++ b/query_optimizer/logical/LogicalType.hpp
@@ -49,7 +49,8 @@ enum class LogicalType {
   kTableGenerator,
   kTableReference,
   kTopLevelPlan,
-  kUpdateTable
+  kUpdateTable,
+  kWindowAggregate
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/PatternMatcher.hpp b/query_optimizer/logical/PatternMatcher.hpp
index ff8f3d0..de8609e 100644
--- a/query_optimizer/logical/PatternMatcher.hpp
+++ b/query_optimizer/logical/PatternMatcher.hpp
@@ -45,6 +45,7 @@ class Sort;
 class TableReference;
 class TopLevelPlan;
 class UpdateTable;
+class WindowAggregate;
 
 /** \addtogroup OptimizerLogical
  *  @{
@@ -130,6 +131,7 @@ using SomeSort = SomeLogicalNode<Sort, LogicalType::kSort>;
 using SomeTableReference = SomeLogicalNode<TableReference, LogicalType::kTableReference>;
 using SomeTopLevelPlan = SomeLogicalNode<TopLevelPlan, LogicalType::kTopLevelPlan>;
 using SomeUpdateTable = SomeLogicalNode<UpdateTable, LogicalType::kUpdateTable>;
+using SomeWindowAggregate = SomeLogicalNode<WindowAggregate, LogicalType::kWindowAggregate>;
 
 /** @} */
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/WindowAggregate.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/WindowAggregate.cpp b/query_optimizer/logical/WindowAggregate.cpp
new file mode 100644
index 0000000..0d747b6
--- /dev/null
+++ b/query_optimizer/logical/WindowAggregate.cpp
@@ -0,0 +1,85 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/logical/WindowAggregate.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "utility/Cast.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+LogicalPtr WindowAggregate::copyWithNewChildren(
+    const std::vector<LogicalPtr> &new_children) const {
+  DCHECK_EQ(getNumChildren(), new_children.size());
+  return Create(new_children[0], window_aggregate_expression_);
+}
+
+std::vector<E::AttributeReferencePtr> WindowAggregate::getOutputAttributes() const {
+  std::vector<E::AttributeReferencePtr> output_attributes(input_->getOutputAttributes());
+  output_attributes.push_back(E::ToRef(window_aggregate_expression_));
+  return output_attributes;
+}
+
+std::vector<E::AttributeReferencePtr> WindowAggregate::getReferencedAttributes() const {
+  return window_aggregate_expression_->getReferencedAttributes();
+}
+
+LogicalPtr WindowAggregate::copyWithNewInputExpressions(
+    const std::vector<E::ExpressionPtr> &input_expressions) const {
+  // Only one expression needed
+  DCHECK_EQ(1u, input_expressions.size());
+
+  E::AliasPtr window_aggregate_expression;
+  E::SomeAlias::MatchesWithConditionalCast(input_expressions[0],
+                                           &window_aggregate_expression);
+
+  return Create(input_, window_aggregate_expression);
+}
+
+void WindowAggregate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  non_container_child_field_names->push_back("input");
+  non_container_child_fields->push_back(input_);
+
+  non_container_child_field_names->push_back("window_aggregate_expression");
+  non_container_child_fields->push_back(window_aggregate_expression_);
+}
+
+}  // namespace logical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/logical/WindowAggregate.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/logical/WindowAggregate.hpp b/query_optimizer/logical/WindowAggregate.hpp
new file mode 100644
index 0000000..dcd9a7d
--- /dev/null
+++ b/query_optimizer/logical/WindowAggregate.hpp
@@ -0,0 +1,123 @@
+/**
+ *   Copyright 2011-2015 Quickstep Technologies LLC.
+ *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_WINDOW_AGGREGATE_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_WINDOW_AGGREGATE_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/logical/LogicalType.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace logical {
+
+/** \addtogroup OptimizerLogical
+ *  @{
+ */
+
+class WindowAggregate;
+typedef std::shared_ptr<const WindowAggregate> WindowAggregatePtr;
+
+/**
+ * @brief Window aggregate operator that computes window aggregate expressions.
+ */
+class WindowAggregate : public Logical {
+ public:
+  LogicalType getLogicalType() const override {
+    return LogicalType::kWindowAggregate;
+  }
+
+  std::string getName() const override { return "WindowAggregate"; }
+
+  /**
+   * @return The input logical node.
+   */
+  const LogicalPtr& input() const { return input_; }
+
+  /**
+   * @return PARTITION BY expressions.
+   */
+  const expressions::AliasPtr window_aggregate_expression() const {
+    return window_aggregate_expression_;
+  }
+
+  LogicalPtr copyWithNewChildren(
+      const std::vector<LogicalPtr> &new_children) const override;
+
+  LogicalPtr copyWithNewInputExpressions(
+      const std::vector<expressions::ExpressionPtr> &input_expressions) const override;
+
+  std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+  /**
+   * @brief Creates an Aggregate logical node.
+   *
+   * @param input The input node.
+   * @param window_aggregate_expression The window aggregate expression.
+   * @return An immutable WindowAggregate node.
+   */
+  static WindowAggregatePtr Create(
+      const LogicalPtr &input,
+      const expressions::AliasPtr &window_aggregate_expression) {
+    return WindowAggregatePtr(new WindowAggregate(input, window_aggregate_expression));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  WindowAggregate(const LogicalPtr &input,
+                  const expressions::AliasPtr &window_aggregate_expression)
+      : input_(input),
+        window_aggregate_expression_(window_aggregate_expression) {
+    addChild(input_);
+    addInputExpression(window_aggregate_expression_);
+  }
+
+  const LogicalPtr input_;
+  const expressions::AliasPtr window_aggregate_expression_;
+
+  DISALLOW_COPY_AND_ASSIGN(WindowAggregate);
+};
+
+/** @} */
+
+}  // namespace logical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_LOGICAL_WINDOW_AGGREGATE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/resolver/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/CMakeLists.txt b/query_optimizer/resolver/CMakeLists.txt
index dc7eac0..fb75767 100644
--- a/query_optimizer/resolver/CMakeLists.txt
+++ b/query_optimizer/resolver/CMakeLists.txt
@@ -89,6 +89,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_queryoptimizer_expressions_SimpleCase
                       quickstep_queryoptimizer_expressions_SubqueryExpression
                       quickstep_queryoptimizer_expressions_UnaryExpression
+                      quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       quickstep_queryoptimizer_logical_Aggregate
                       quickstep_queryoptimizer_logical_CopyFrom
                       quickstep_queryoptimizer_logical_CreateIndex
@@ -109,6 +110,7 @@ target_link_libraries(quickstep_queryoptimizer_resolver_Resolver
                       quickstep_queryoptimizer_logical_TableReference
                       quickstep_queryoptimizer_logical_TopLevelPlan
                       quickstep_queryoptimizer_logical_UpdateTable
+                      quickstep_queryoptimizer_logical_WindowAggregate
                       quickstep_queryoptimizer_resolver_NameResolver
                       quickstep_storage_StorageBlockLayout_proto
                       quickstep_storage_StorageConstants

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/resolver/Resolver.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.cpp b/query_optimizer/resolver/Resolver.cpp
index ffc173a..f880ce7 100644
--- a/query_optimizer/resolver/Resolver.cpp
+++ b/query_optimizer/resolver/Resolver.cpp
@@ -85,6 +85,7 @@
 #include "query_optimizer/expressions/SimpleCase.hpp"
 #include "query_optimizer/expressions/SubqueryExpression.hpp"
 #include "query_optimizer/expressions/UnaryExpression.hpp"
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/logical/Aggregate.hpp"
 #include "query_optimizer/logical/CopyFrom.hpp"
 #include "query_optimizer/logical/CreateIndex.hpp"
@@ -104,6 +105,7 @@
 #include "query_optimizer/logical/TableReference.hpp"
 #include "query_optimizer/logical/TopLevelPlan.hpp"
 #include "query_optimizer/logical/UpdateTable.hpp"
+#include "query_optimizer/logical/WindowAggregate.hpp"
 #include "query_optimizer/resolver/NameResolver.hpp"
 #include "storage/StorageBlockLayout.pb.h"
 #include "storage/StorageConstants.hpp"
@@ -164,9 +166,11 @@ struct Resolver::ExpressionResolutionInfo {
    */
   ExpressionResolutionInfo(const NameResolver &name_resolver_in,
                            QueryAggregationInfo *query_aggregation_info_in,
+                           WindowAggregationInfo *window_aggregation_info_in,
                            SelectListInfo *select_list_info_in)
       : name_resolver(name_resolver_in),
         query_aggregation_info(query_aggregation_info_in),
+        window_aggregation_info(window_aggregation_info_in),
         select_list_info(select_list_info_in) {}
 
   /**
@@ -180,6 +184,7 @@ struct Resolver::ExpressionResolutionInfo {
       : name_resolver(parent.name_resolver),
         not_allow_aggregate_here(parent.not_allow_aggregate_here),
         query_aggregation_info(parent.query_aggregation_info),
+        window_aggregation_info(parent.window_aggregation_info),
         select_list_info(parent.select_list_info) {}
 
   /**
@@ -187,16 +192,29 @@ struct Resolver::ExpressionResolutionInfo {
    */
   bool hasAggregate() const { return parse_aggregate_expression != nullptr; }
 
+  /**
+   * @return True if the expression contains a window aggregate function.
+   **/
+  bool hasWindowAggregate() const {
+    return parse_window_aggregate_expression != nullptr;
+  }
+
   const NameResolver &name_resolver;
   // Empty if aggregations are allowed.
   const std::string not_allow_aggregate_here;
 
   // Can be NULL if aggregations are not allowed.
   QueryAggregationInfo *query_aggregation_info = nullptr;
+
+  // Alias expressions that wraps window aggregate functions.
+  WindowAggregationInfo *window_aggregation_info = nullptr;
+
   // Can be NULL if alias references to SELECT-list expressions are not allowed.
   SelectListInfo *select_list_info = nullptr;
   // The first aggregate in the expression.
   const ParseTreeNode *parse_aggregate_expression = nullptr;
+  // The first window aggregate in the expression.
+  const ParseTreeNode *parse_window_aggregate_expression = nullptr;
 };
 
 struct Resolver::QueryAggregationInfo {
@@ -209,6 +227,26 @@ struct Resolver::QueryAggregationInfo {
   std::vector<E::AliasPtr> aggregate_expressions;
 };
 
+struct Resolver::WindowPlan {
+  WindowPlan(const E::WindowInfo &window_info_in,
+             const L::LogicalPtr &logical_plan_in)
+      : window_info(window_info_in),
+        logical_plan(logical_plan_in) {}
+
+  const E::WindowInfo window_info;
+  const L::LogicalPtr logical_plan;
+};
+
+struct Resolver::WindowAggregationInfo {
+  explicit WindowAggregationInfo(const std::unordered_map<std::string, WindowPlan> &window_map_in)
+      : window_map(window_map_in) {}
+
+  // Whether the current query block has a GROUP BY.
+  const std::unordered_map<std::string, WindowPlan> window_map;
+  // Alias expressions that wraps window aggregate functions.
+  std::vector<E::AliasPtr> window_aggregate_expressions;
+};
+
 struct Resolver::SelectListInfo {
  public:
   /**
@@ -973,8 +1011,36 @@ L::LogicalPtr Resolver::resolveSelect(
         logical_plan, resolvePredicate(parse_predicate, &expr_resolution_info));
   }
 
+  // Resolve WINDOW clause.
+  std::unordered_map<std::string, WindowPlan> sorted_window_map;
+  if (select_query.window_list() != nullptr) {
+    if (select_query.window_list()->size() > 1) {
+      THROW_SQL_ERROR_AT(&(*select_query.window_list()->begin()))
+          << "Currently we don't support multiple window aggregation functions";
+    }
+
+    // Sort the table according to the window.
+    for (const ParseWindow &window : *select_query.window_list()) {
+      // Check for duplicate definition.
+      // Currently this is useless since we only support one window.
+      if (sorted_window_map.find(window.name()->value()) != sorted_window_map.end()) {
+        THROW_SQL_ERROR_AT(window.name())
+            << "Duplicate definition of window " << window.name()->value();
+      }
+
+      E::WindowInfo resolved_window = resolveWindow(window, *name_resolver);
+      L::LogicalPtr sorted_logical_plan = resolveSortInWindow(logical_plan,
+                                                              resolved_window);
+
+      WindowPlan window_plan(resolved_window, sorted_logical_plan);
+
+      sorted_window_map.emplace(window.name()->value(), window_plan);
+    }
+  }
+
   QueryAggregationInfo query_aggregation_info(
       (select_query.group_by() != nullptr));
+  WindowAggregationInfo window_aggregation_info(sorted_window_map);
 
   // Resolve SELECT-list clause.
   std::vector<E::NamedExpressionPtr> select_list_expressions;
@@ -984,6 +1050,7 @@ L::LogicalPtr Resolver::resolveSelect(
                       type_hints,
                       *name_resolver,
                       &query_aggregation_info,
+                      &window_aggregation_info,
                       &select_list_expressions,
                       &has_aggregate_per_expression);
   DCHECK_EQ(has_aggregate_per_expression.size(),
@@ -992,6 +1059,29 @@ L::LogicalPtr Resolver::resolveSelect(
   SelectListInfo select_list_info(select_list_expressions,
                                   has_aggregate_per_expression);
 
+  // Create window aggregate node if needed
+  for (const E::AliasPtr &alias : window_aggregation_info.window_aggregate_expressions) {
+    E::WindowAggregateFunctionPtr window_aggregate_function;
+    if (!E::SomeWindowAggregateFunction::MatchesWithConditionalCast(alias->expression(),
+                                                                    &window_aggregate_function)) {
+      THROW_SQL_ERROR()
+          << "Unexpected expression in window aggregation function";
+    }
+    L::LogicalPtr sorted_logical_plan;
+
+    // Get the sorted logical plan
+    const std::string window_name = window_aggregate_function->window_name();
+    if (!window_name.empty()) {
+      sorted_logical_plan = sorted_window_map.at(window_name).logical_plan;
+    } else {
+      sorted_logical_plan = resolveSortInWindow(logical_plan,
+                                                window_aggregate_function->window_info());
+    }
+
+    logical_plan = L::WindowAggregate::Create(sorted_logical_plan,
+                                              alias);
+  }
+
   // Resolve GROUP BY.
   std::vector<E::NamedExpressionPtr> group_by_expressions;
   if (select_query.group_by() != nullptr) {
@@ -1039,7 +1129,7 @@ L::LogicalPtr Resolver::resolveSelect(
   E::PredicatePtr having_predicate;
   if (select_query.having() != nullptr) {
     ExpressionResolutionInfo expr_resolution_info(
-        *name_resolver, &query_aggregation_info, &select_list_info);
+        *name_resolver, &query_aggregation_info, &window_aggregation_info, &select_list_info);
     having_predicate = resolvePredicate(
         *select_query.having()->having_predicate(), &expr_resolution_info);
   }
@@ -1053,7 +1143,7 @@ L::LogicalPtr Resolver::resolveSelect(
     for (const ParseOrderByItem &order_by_item :
          *select_query.order_by()->order_by_items()) {
       ExpressionResolutionInfo expr_resolution_info(
-          *name_resolver, &query_aggregation_info, &select_list_info);
+          *name_resolver, &query_aggregation_info, &window_aggregation_info, &select_list_info);
       E::ScalarPtr order_by_scalar = resolveExpression(
           *order_by_item.ordering_expression(),
           nullptr,  // No Type hint.
@@ -1528,6 +1618,89 @@ L::LogicalPtr Resolver::RenameOutputColumns(
   return L::Project::Create(logical_plan, project_expressions);
 }
 
+E::WindowInfo Resolver::resolveWindow(const ParseWindow &parse_window,
+                                      const NameResolver &name_resolver) {
+  std::vector<E::AttributeReferencePtr> partition_by_attributes;
+  std::vector<E::AttributeReferencePtr> order_by_attributes;
+  std::vector<bool> order_by_directions;
+  std::vector<bool> nulls_first;
+  E::WindowFrameInfo *frame_info = nullptr;
+
+  // Resolve PARTITION BY
+  if (parse_window.partition_by_expressions() != nullptr) {
+    for (const ParseExpression &unresolved_partition_by_expression :
+         *parse_window.partition_by_expressions()) {
+      ExpressionResolutionInfo expr_resolution_info(
+          name_resolver,
+          "PARTITION BY clause" /* clause_name */,
+          nullptr /* select_list_info */);
+      E::ScalarPtr partition_by_scalar = resolveExpression(
+          unresolved_partition_by_expression,
+          nullptr,  // No Type hint.
+          &expr_resolution_info);
+
+      if (partition_by_scalar->isConstant()) {
+        THROW_SQL_ERROR_AT(&unresolved_partition_by_expression)
+            << "Constant expression not allowed in PARTITION BY";
+      }
+
+      E::AttributeReferencePtr partition_by_attribute;
+      if (!E::SomeAttributeReference::MatchesWithConditionalCast(partition_by_scalar,
+                                                                 &partition_by_attribute)) {
+        THROW_SQL_ERROR_AT(&unresolved_partition_by_expression)
+            << "Only attribute name allowed in PARTITION BY in window definition";
+      }
+
+      partition_by_attributes.push_back(partition_by_attribute);
+    }
+  }
+
+  // Resolve ORDER BY
+  if (parse_window.order_by_expressions() != nullptr) {
+    for (const ParseOrderByItem &order_by_item :
+         *parse_window.order_by_expressions()) {
+      ExpressionResolutionInfo expr_resolution_info(
+          name_resolver,
+          "ORDER BY clause" /* clause name */,
+          nullptr /* select_list_info */);
+      E::ScalarPtr order_by_scalar = resolveExpression(
+          *order_by_item.ordering_expression(),
+          nullptr,  // No Type hint.
+          &expr_resolution_info);
+
+      if (order_by_scalar->isConstant()) {
+        THROW_SQL_ERROR_AT(&order_by_item)
+            << "Constant expression not allowed in ORDER BY";
+      }
+
+      E::AttributeReferencePtr order_by_attribute;
+      if (!E::SomeAttributeReference::MatchesWithConditionalCast(order_by_scalar,
+                                                                 &order_by_attribute)) {
+        THROW_SQL_ERROR_AT(&order_by_item)
+            << "Only attribute name allowed in ORDER BY in window definition";
+      }
+
+      order_by_attributes.push_back(order_by_attribute);
+      order_by_directions.push_back(order_by_item.is_ascending());
+      nulls_first.push_back(order_by_item.nulls_first());
+    }
+  }
+
+  // Resolve window frame
+  if (parse_window.frame_info() != nullptr) {
+    const quickstep::ParseFrameInfo *parse_frame_info = parse_window.frame_info();
+    frame_info = new E::WindowFrameInfo(parse_frame_info->is_row,
+                                        parse_frame_info->num_preceding,
+                                        parse_frame_info->num_following);
+  }
+
+  return E::WindowInfo(partition_by_attributes,
+                       order_by_attributes,
+                       order_by_directions,
+                       nulls_first,
+                       frame_info);
+}
+
 const CatalogRelation* Resolver::resolveRelationName(
     const ParseString *relation_name) {
   const CatalogRelation *relation =
@@ -1684,13 +1857,45 @@ L::LogicalPtr Resolver::resolveJoinedTableReference(
   THROW_SQL_ERROR_AT(&joined_table_reference) << "Full outer join is not supported yet";
 }
 
+L::LogicalPtr Resolver::resolveSortInWindow(
+    const L::LogicalPtr &logical_plan,
+    const E::WindowInfo &window_info) {
+  // Sort the table by (p_key, o_key)
+  std::vector<E::AttributeReferencePtr> sort_attributes(window_info.partition_by_attributes);
+  sort_attributes.insert(sort_attributes.end(),
+                         window_info.order_by_attributes.begin(),
+                         window_info.order_by_attributes.end());
+
+  std::vector<bool> sort_directions(
+      window_info.partition_by_attributes.size(), true);
+  sort_directions.insert(sort_directions.end(),
+                         window_info.order_by_directions.begin(),
+                         window_info.order_by_directions.end());
+
+  std::vector<bool> sort_nulls_first(
+      window_info.partition_by_attributes.size(), false);
+  sort_nulls_first.insert(sort_nulls_first.end(),
+                          window_info.nulls_first.begin(),
+                          window_info.nulls_first.end());
+
+  L::LogicalPtr sorted_logical_plan =
+      L::Sort::Create(logical_plan,
+                      sort_attributes,
+                      sort_directions,
+                      sort_nulls_first,
+                      -1 /* limit */);
+
+  return sorted_logical_plan;
+}
+
 void Resolver::resolveSelectClause(
     const ParseSelectionClause &parse_selection,
     const std::string &select_name,
     const std::vector<const Type*> *type_hints,
     const NameResolver &name_resolver,
     QueryAggregationInfo *query_aggregation_info,
-    std::vector<expressions::NamedExpressionPtr> *project_expressions,
+    WindowAggregationInfo *window_aggregation_info,
+    std::vector<E::NamedExpressionPtr> *project_expressions,
     std::vector<bool> *has_aggregate_per_expression) {
   project_expressions->clear();
   switch (parse_selection.getSelectionType()) {
@@ -1720,6 +1925,7 @@ void Resolver::resolveSelectClause(
         ExpressionResolutionInfo expr_resolution_info(
             name_resolver,
             query_aggregation_info,
+            window_aggregation_info,
             nullptr /* select_list_info */);
         const E::ScalarPtr project_scalar =
             resolveExpression(*parse_project_expression,
@@ -2362,16 +2568,12 @@ E::ScalarPtr Resolver::resolveSimpleCaseExpression(
 
 // TODO(chasseur): For now this only handles resolving aggregate functions. In
 // the future it should be extended to resolve scalar functions as well.
+// TODO(Shixuan): This will handle resolving window aggregation function as well,
+// which could be extended to general scalar functions.
 E::ScalarPtr Resolver::resolveFunctionCall(
     const ParseFunctionCall &parse_function_call,
     ExpressionResolutionInfo *expression_resolution_info) {
-  std::string function_name = ToLower(parse_function_call.name()->value());
-
-  // TODO(Shixuan): Add support for window aggregation function.
-  if (parse_function_call.isWindow()) {
-    THROW_SQL_ERROR_AT(&parse_function_call)
-        << "Window Aggregation Function is not supported currently";
-  }
+  const std::string function_name = ToLower(parse_function_call.name()->value());
 
   // First check for the special case COUNT(*).
   bool count_star = false;
@@ -2386,8 +2588,9 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   std::vector<E::ScalarPtr> resolved_arguments;
   const PtrList<ParseExpression> *unresolved_arguments =
       parse_function_call.arguments();
-  // The first aggregate function in the arguments.
+  // The first aggregate function and window aggregate function in the arguments.
   const ParseTreeNode *first_aggregate_function = nullptr;
+  const ParseTreeNode *first_window_aggregate_function = nullptr;
   if (unresolved_arguments != nullptr) {
     for (const ParseExpression &unresolved_argument : *unresolved_arguments) {
       ExpressionResolutionInfo expr_resolution_info(
@@ -2401,6 +2604,13 @@ E::ScalarPtr Resolver::resolveFunctionCall(
         first_aggregate_function =
             expr_resolution_info.parse_aggregate_expression;
       }
+
+      if (expr_resolution_info.hasWindowAggregate() &&
+          first_window_aggregate_function == nullptr &&
+          parse_function_call.isWindow()) {
+          first_window_aggregate_function =
+              expr_resolution_info.parse_window_aggregate_expression;
+      }
     }
   }
 
@@ -2431,6 +2641,15 @@ E::ScalarPtr Resolver::resolveFunctionCall(
         << "Aggregation of Aggregates are not allowed";
   }
 
+  // TODO(Shixuan): We currently don't support nested window aggregation since
+  // TPC-DS doesn't have that. However, it is essentially a nested scalar
+  // function, which should be supported in the future version of Quickstep.
+  if (parse_function_call.isWindow() &&
+      first_window_aggregate_function != nullptr) {
+    THROW_SQL_ERROR_AT(first_window_aggregate_function)
+        << "Window aggregation of window aggregates is not allowed";
+  }
+
   // Make sure a naked COUNT() with no arguments wasn't specified.
   if ((aggregate->getAggregationID() == AggregationID::kCount)
       && (resolved_arguments.empty())
@@ -2452,6 +2671,13 @@ E::ScalarPtr Resolver::resolveFunctionCall(
         << " can not apply to the given argument(s).";
   }
 
+  if (parse_function_call.isWindow()) {
+    return resolveWindowAggregateFunction(parse_function_call,
+                                          expression_resolution_info,
+                                          aggregate,
+                                          resolved_arguments);
+  }
+
   // Create the optimizer representation of the resolved aggregate and an alias
   // for it to appear in the output relation.
   const E::AggregateFunctionPtr aggregate_function
@@ -2471,6 +2697,62 @@ E::ScalarPtr Resolver::resolveFunctionCall(
   return E::ToRef(aggregate_alias);
 }
 
+E::ScalarPtr Resolver::resolveWindowAggregateFunction(
+    const ParseFunctionCall &parse_function_call,
+    ExpressionResolutionInfo *expression_resolution_info,
+    const ::quickstep::AggregateFunction *window_aggregate,
+    const std::vector<E::ScalarPtr> &resolved_arguments) {
+  // A window aggregate function might be defined OVER a window name or a window.
+  E::WindowAggregateFunctionPtr window_aggregate_function;
+  if (parse_function_call.window_name() != nullptr) {
+    std::unordered_map<std::string, WindowPlan> window_map
+        = expression_resolution_info->window_aggregation_info->window_map;
+    std::string window_name = parse_function_call.window_name()->value();
+    std::unordered_map<std::string, WindowPlan>::const_iterator map_it
+        = window_map.find(window_name);
+
+    if (map_it == window_map.end()) {
+      THROW_SQL_ERROR_AT(parse_function_call.window_name())
+          << "Undefined window " << window_name;
+    }
+
+    window_aggregate_function =
+        E::WindowAggregateFunction::Create(*window_aggregate,
+                                           resolved_arguments,
+                                           map_it->second.window_info,
+                                           parse_function_call.window_name()->value(),
+                                           parse_function_call.is_distinct());
+  } else {
+    E::WindowInfo resolved_window = resolveWindow(*parse_function_call.window(),
+                                                  expression_resolution_info->name_resolver);
+
+    window_aggregate_function =
+        E::WindowAggregateFunction::Create(*window_aggregate,
+                                           resolved_arguments,
+                                           resolved_window,
+                                           "" /* window name */,
+                                           parse_function_call.is_distinct());
+  }
+
+  const std::string internal_alias = GenerateWindowAggregateAttributeAlias(
+      expression_resolution_info->query_aggregation_info->aggregate_expressions.size());
+  const E::AliasPtr aggregate_alias = E::Alias::Create(context_->nextExprId(),
+                                                       window_aggregate_function,
+                                                       "" /* attribute_name */,
+                                                       internal_alias,
+                                                       "$window_aggregate" /* relation_name */);
+
+  if (!expression_resolution_info->window_aggregation_info->window_aggregate_expressions.empty()) {
+    THROW_SQL_ERROR_AT(&parse_function_call)
+        << "Currently we only support single window aggregate in the query";
+  }
+
+  expression_resolution_info->window_aggregation_info
+      ->window_aggregate_expressions.emplace_back(aggregate_alias);
+  expression_resolution_info->parse_window_aggregate_expression = &parse_function_call;
+  return E::ToRef(aggregate_alias);
+}
+
 std::vector<E::PredicatePtr> Resolver::resolvePredicates(
     const PtrList<ParsePredicate> &parse_predicates,
     ExpressionResolutionInfo *expression_resolution_info) {
@@ -2794,16 +3076,20 @@ void Resolver::rewriteIfOrdinalReference(
   }
 }
 
+std::string Resolver::GenerateWindowAggregateAttributeAlias(int index) {
+  return "$window_aggregate" + std::to_string(index);
+}
+
 std::string Resolver::GenerateAggregateAttributeAlias(int index) {
-  return std::string("$aggregate").append(std::to_string(index));
+  return "$aggregate" + std::to_string(index);
 }
 
 std::string Resolver::GenerateGroupingAttributeAlias(int index) {
-  return std::string("$groupby").append(std::to_string(index));
+  return "$groupby" + std::to_string(index);
 }
 
 std::string Resolver::GenerateOrderingAttributeAlias(int index) {
-  return std::string("$orderby").append(std::to_string(index));
+  return "$orderby" + std::to_string(index);
 }
 
 }  // namespace resolver

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/resolver/Resolver.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/resolver/Resolver.hpp b/query_optimizer/resolver/Resolver.hpp
index a84c61c..f4024e9 100644
--- a/query_optimizer/resolver/Resolver.hpp
+++ b/query_optimizer/resolver/Resolver.hpp
@@ -24,11 +24,13 @@
 #include <vector>
 
 #include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/SubqueryExpression.hpp"
 #include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/expressions/WindowAggregateFunction.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
@@ -65,6 +67,7 @@ class ParseSubqueryExpression;
 class ParseTableReference;
 class ParseTableReferenceSignature;
 class ParseTreeNode;
+class ParseWindow;
 template <class T>
 class PtrList;
 class StorageBlockLayoutDescription;
@@ -123,6 +126,17 @@ class Resolver {
   struct QueryAggregationInfo;
 
   /**
+   * @brief Query-scoped info that contains window aggregate expressions and a
+   *        window map.
+   **/
+  struct WindowAggregationInfo;
+
+  /**
+   * @brief A wrapper for resolved window and the corresponding sorted plan.
+   **/
+  struct WindowPlan;
+
+  /**
    * @brief Query-scoped info that contains select-list expressions
    *        and whether an expression is referenced by an
    *        ordinal or alias reference.
@@ -271,6 +285,8 @@ class Resolver {
    * @param name_resolver NameResolver to resolve the relation/attribute names.
    * @param query_aggregation_info Passed down to each expression to collects
    *                               aggregate expressions.
+   * @param window_aggregate_expressions Passed down to each expressions to
+   *                                     collects window aggregate expressions.
    * @param project_expressions Converted SELECT-list expressions.
    * @param has_aggregate_per_expression For each SELECT-list expression,
    *                                     indicates whether it contains
@@ -282,6 +298,7 @@ class Resolver {
       const std::vector<const Type*> *type_hints,
       const NameResolver &name_resolver,
       QueryAggregationInfo *query_aggregation_info,
+      WindowAggregationInfo *window_aggregation_info,
       std::vector<expressions::NamedExpressionPtr> *project_expressions,
       std::vector<bool> *has_aggregate_per_expression);
 
@@ -359,6 +376,17 @@ class Resolver {
       const ParseTableReferenceSignature &table_signature);
 
   /**
+   * @brief Sort the input table in (p_key, o_key) order specified by the window.
+   *
+   * @param logical_plan The input logical node.
+   * @param window_info The window that the input table has to be sorted accordingly.
+   * @return A logical plan that sorts the table according to window_info.
+   **/
+  logical::LogicalPtr resolveSortInWindow(
+      const logical::LogicalPtr &logical_plan,
+      const expressions::WindowInfo &window_info);
+
+  /**
    * @brief Resolves a parse expression and converts it to a scalar expression
    *        in the query optimizer. A non-scalar parse expression is resolved
    *        to an AttributeReference to another optimizer expression.
@@ -412,7 +440,8 @@ class Resolver {
    * @brief Resolves a function call. For a non-scalar function, the returned
    *        expression is an AttributeReference to the actual resolved expression.
    *
-   * @note This currently only handles resolving aggregate functions.
+   * @note This currently only handles resolving aggregate functions and window
+   *       aggregate functions.
    *
    * @param parse_function_call The function call to be resolved.
    * @param expression_resolution_info Resolution info that contains the name
@@ -425,6 +454,23 @@ class Resolver {
       ExpressionResolutionInfo *expression_resolution_info);
 
   /**
+   * @brief Resolves a window aggregate function.
+   *
+   * @param parse_function_call The function call to be resolved.
+   * @param expression_resolution_info Resolution info that contains the name
+   *                                   resolver and info to be updated after
+   *                                   resolution.
+   * @param aggregate The aggregate function.
+   * @param resolved_arguments The resolved arguments.
+   * @return An expression in the query optimizer.
+   */
+  expressions::ScalarPtr resolveWindowAggregateFunction(
+      const ParseFunctionCall &parse_function_call,
+      ExpressionResolutionInfo *expression_resolution_info,
+      const ::quickstep::AggregateFunction *aggregate,
+      const std::vector<expressions::ScalarPtr> &resolved_arguments);
+
+  /**
    * @brief Resolves a parse Predicate and converts it to a predicate in the
    *        query optimizer.
    *
@@ -469,6 +515,15 @@ class Resolver {
       const bool has_single_column);
 
   /**
+   * @brief Resolves a window definition.
+   *
+   * @param parse_window The parsed window definition.
+   * @param name_resolver The resolver to resolve names.
+   **/
+  expressions::WindowInfo resolveWindow(const ParseWindow &parse_window,
+                                        const NameResolver &name_resolver);
+
+  /**
    * @brief Resolves a relation name to a pointer to the corresponding
    *        CatalogRelation with the name.
    *
@@ -501,6 +556,15 @@ class Resolver {
   static std::string GenerateAggregateAttributeAlias(int index);
 
   /**
+   * @brief Generates an internal alias for a window aggregate attribute.
+   *
+   * @param index The index of the window aggregate attribute used for
+   *              generating the name.
+   * @return A string for the name.
+   */
+  static std::string GenerateWindowAggregateAttributeAlias(int index);
+
+  /**
    * @brief Generates an internal alias for a grouping attribute.
    *
    * @param index The index of the grouping attribute used for generating the

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/strategy/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/CMakeLists.txt b/query_optimizer/strategy/CMakeLists.txt
index 74f5a4b..84e151e 100644
--- a/query_optimizer/strategy/CMakeLists.txt
+++ b/query_optimizer/strategy/CMakeLists.txt
@@ -105,7 +105,8 @@ target_link_libraries(quickstep_queryoptimizer_strategy_OneToOne
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_physical_UpdateTable
                       quickstep_queryoptimizer_strategy_Strategy
-                      quickstep_utility_Macros)
+                      quickstep_utility_Macros
+                      quickstep_utility_SqlError)
 target_link_libraries(quickstep_queryoptimizer_strategy_Selection
                       glog
                       quickstep_queryoptimizer_LogicalToPhysicalMapper

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/strategy/OneToOne.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/strategy/OneToOne.cpp b/query_optimizer/strategy/OneToOne.cpp
index 7f59151..f49a25c 100644
--- a/query_optimizer/strategy/OneToOne.cpp
+++ b/query_optimizer/strategy/OneToOne.cpp
@@ -55,6 +55,7 @@
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
 #include "query_optimizer/physical/UpdateTable.hpp"
+#include "utility/SqlError.hpp"
 
 namespace quickstep {
 namespace optimizer {
@@ -208,6 +209,10 @@ bool OneToOne::generatePlan(const L::LogicalPtr &logical_input,
           update_table->predicate());
       return true;
     }
+    case L::LogicalType::kWindowAggregate: {
+      THROW_SQL_ERROR()
+          << "Window aggregate function is not supported currently :(";
+    }
     default:
       return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/tests/logical_generator/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/logical_generator/Select.test b/query_optimizer/tests/logical_generator/Select.test
index 3c152e8..e0003bf 100644
--- a/query_optimizer/tests/logical_generator/Select.test
+++ b/query_optimizer/tests/logical_generator/Select.test
@@ -1354,3 +1354,165 @@ TopLevelPlan
 +-output_attributes=
   +-AttributeReference[id=5,name=x,relation=,type=Int]
   +-AttributeReference[id=6,name=y,relation=,type=Int]
+==
+
+# Window Aggregate Function Test.
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,false],nulls_first=[false,false]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | |   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| |   relation=$window_aggregate,type=Double NULL]
+| |   +-WindowAggregateFunction[function=AVG,window_name=w,is_ascending=[false],
+| |     nulls_first=[false],frame_mode=row,num_preceding=-1,num_following=0]
+| |     +-arguments=
+| |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |     +-partition_by=
+| |     | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| |     +-order_by=
+| |       +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+|       relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+FROM test;
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,true,false,true],
+| | | nulls_first=[false,false,false,true]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | |   +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   | type=VarChar(20) NULL]
+| | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | |   +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| |   relation=$window_aggregate,type=Double NULL]
+| |   +-WindowAggregateFunction[function=SUM,window_name=,
+| |     is_ascending=[false,true],nulls_first=[false,true],frame_mode=range,
+| |     num_preceding=3,num_following=3]
+| |     +-arguments=
+| |     | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |     +-partition_by=
+| |     | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| |     | | type=VarChar(20) NULL]
+| |     | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |     +-order_by=
+| |       +-AttributeReference[id=3,name=double_col,relation=test,
+| |       | type=Double NULL]
+| |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| +-project_list=
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   +-Alias[id=6,name=,alias=sum(float_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+|       relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+  +-AttributeReference[id=6,name=,alias=sum(float_col),relation=,
+    type=Double NULL]
+==
+
+SELECT sum(avg(int_col) OVER w) FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=WindowAggregate
+| | | +-input=Sort[is_ascending=[true,true],nulls_first=[false,false]]
+| | | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | | | type=Double NULL]
+| | | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | |   type=VarChar(20) NULL]
+| | | | +-sort_expressions=
+| | | |   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | |   relation=$window_aggregate,type=Double NULL]
+| | |   +-WindowAggregateFunction[function=AVG,window_name=w,
+| | |     is_ascending=[true],nulls_first=[false],frame_mode=row,
+| | |     num_preceding=-1,num_following=0]
+| | |     +-arguments=
+| | |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |     +-partition_by=
+| | |     | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | |     +-order_by=
+| | |       +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=7,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| |     +-AggregateFunction[function=SUM]
+| |       +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| |         relation=$window_aggregate,type=Double NULL]
+| +-project_list=
+|   +-Alias[id=7,name=,alias=sum(avg(int_col)),relation=,type=Double NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate0,relation=$aggregate,
+|       type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=7,name=,alias=sum(avg(int_col)),relation=,
+    type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER w1 FROM test
+WINDOW w2 AS
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING);
+--
+ERROR: Undefined window w1 (1 : 37)
+SELECT int_col, sum(float_col) OVER w1 FROM test
+                                    ^
+==
+
+SELECT sum(avg(int_col)) OVER w FROM test
+WINDOW w AS
+(PARTITION BY double_col
+ ORDER BY char_col)
+--
+ERROR: Aggregation of Aggregates are not allowed (1 : 12)
+SELECT sum(avg(int_col)) OVER w FROM test
+           ^

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/714874ce/query_optimizer/tests/resolver/Select.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/resolver/Select.test b/query_optimizer/tests/resolver/Select.test
index 141bfa0..89ab84d 100644
--- a/query_optimizer/tests/resolver/Select.test
+++ b/query_optimizer/tests/resolver/Select.test
@@ -3126,3 +3126,165 @@ FROM test;
 ERROR: The substring length must be greater than 0 (1 : 8)
 SELECT SUBSTRING(char_col FROM 1 FOR ...
        ^
+==
+
+# Window Aggregate Function Test.
+SELECT avg(int_col) OVER w FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col DESC NULLS LAST
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,false],nulls_first=[false,false]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | |   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| |   relation=$window_aggregate,type=Double NULL]
+| |   +-WindowAggregateFunction[function=AVG,window_name=w,is_ascending=[false],
+| |     nulls_first=[false],frame_mode=row,num_preceding=-1,num_following=0]
+| |     +-arguments=
+| |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |     +-partition_by=
+| |     | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| |     +-order_by=
+| |       +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+|       relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=avg(int_col),relation=,type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING)
+FROM test;
+--
+TopLevelPlan
++-plan=Project
+| +-input=WindowAggregate
+| | +-input=Sort[is_ascending=[true,true,false,true],
+| | | nulls_first=[false,false,false,true]]
+| | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-sort_expressions=
+| | |   +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   | type=VarChar(20) NULL]
+| | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | |   +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| |   relation=$window_aggregate,type=Double NULL]
+| |   +-WindowAggregateFunction[function=SUM,window_name=,
+| |     is_ascending=[false,true],nulls_first=[false,true],frame_mode=range,
+| |     num_preceding=3,num_following=3]
+| |     +-arguments=
+| |     | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |     +-partition_by=
+| |     | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| |     | | type=VarChar(20) NULL]
+| |     | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |     +-order_by=
+| |       +-AttributeReference[id=3,name=double_col,relation=test,
+| |       | type=Double NULL]
+| |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| +-project_list=
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   +-Alias[id=6,name=,alias=sum(float_col),relation=,type=Double NULL]
+|     +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+|       relation=$window_aggregate,type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+  +-AttributeReference[id=6,name=,alias=sum(float_col),relation=,
+    type=Double NULL]
+==
+
+SELECT sum(avg(int_col) OVER w) FROM test
+WINDOW w AS
+(PARTITION BY char_col
+ ORDER BY long_col
+ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
+--
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=WindowAggregate
+| | | +-input=Sort[is_ascending=[true,true],nulls_first=[false,false]]
+| | | | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | | +-AttributeReference[id=3,name=double_col,relation=test,
+| | | | | | type=Double NULL]
+| | | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | | |   type=VarChar(20) NULL]
+| | | | +-sort_expressions=
+| | | |   +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-window_aggregate_expression=Alias[id=6,name=,alias=$window_aggregate0,
+| | |   relation=$window_aggregate,type=Double NULL]
+| | |   +-WindowAggregateFunction[function=AVG,window_name=w,
+| | |     is_ascending=[true],nulls_first=[false],frame_mode=row,
+| | |     num_preceding=-1,num_following=0]
+| | |     +-arguments=
+| | |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |     +-partition_by=
+| | |     | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | |     +-order_by=
+| | |       +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=7,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| |     +-AggregateFunction[function=SUM]
+| |       +-AttributeReference[id=6,name=,alias=$window_aggregate0,
+| |         relation=$window_aggregate,type=Double NULL]
+| +-project_list=
+|   +-Alias[id=7,name=,alias=sum(avg(int_col)),relation=,type=Double NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate0,relation=$aggregate,
+|       type=Double NULL]
++-output_attributes=
+  +-AttributeReference[id=7,name=,alias=sum(avg(int_col)),relation=,
+    type=Double NULL]
+==
+
+SELECT int_col, sum(float_col) OVER w1 FROM test
+WINDOW w2 AS
+(PARTITION BY vchar_col, long_col
+ ORDER BY double_col DESC NULLS LAST, int_col ASC NULLS FIRST
+ RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING);
+--
+ERROR: Undefined window w1 (1 : 37)
+SELECT int_col, sum(float_col) OVER w1 FROM test
+                                    ^
+==
+
+SELECT sum(avg(int_col)) OVER w FROM test
+WINDOW w AS
+(PARTITION BY double_col
+ ORDER BY char_col)
+--
+ERROR: Aggregation of Aggregates are not allowed (1 : 12)
+SELECT sum(avg(int_col)) OVER w FROM test
+           ^


[09/11] incubator-quickstep git commit: Bug fix in initialization of probabilities.

Posted by hb...@apache.org.
Bug fix in initialization of probabilities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/35bf47e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/35bf47e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/35bf47e9

Branch: refs/heads/scheduler++
Commit: 35bf47e9db81cb6a598ea8cbb9a06493a7fe3214
Parents: 6183e39
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:57:01 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                |  7 +++--
 query_execution/tests/Learner_unittest.cpp | 38 +++++++++++++++++++++++++
 2 files changed, 42 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/35bf47e9/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 5d877b4..38a773b 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -184,9 +184,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   for (auto priority_iter = execution_stats_.cbegin();
        priority_iter != execution_stats_.cend();
        ++priority_iter) {
-    sum_priority_levels += priority_iter->second.size();
-    priority_levels.emplace_back(priority_iter->first);
-    numerators.emplace_back(priority_iter->first);
+    const std::size_t curr_priority_level = priority_iter->first;
+    sum_priority_levels += curr_priority_level;
+    priority_levels.emplace_back(curr_priority_level);
+    numerators.emplace_back(curr_priority_level);
   }
   if (sum_priority_levels > 0) {
     probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/35bf47e9/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 74353f0..864bb22 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -84,4 +84,42 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 }
 
+TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel1 = 1;
+  const std::size_t kPriorityLevel2 = 2;
+  handle1.reset(new QueryHandle(1, kPriorityLevel1));
+  handle2.reset(new QueryHandle(2, kPriorityLevel2));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  learner.addQuery(*handle1);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  learner.addQuery(*handle2);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  learner.removeQuery(handle2->query_id());
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+
+  learner.removeQuery(handle1->query_id());
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+}
+
 }  // namespace quickstep


[08/11] incubator-quickstep git commit: Added test for adding completion message feedback.

Posted by hb...@apache.org.
Added test for adding completion message feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/cb519ebd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/cb519ebd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/cb519ebd

Branch: refs/heads/scheduler++
Commit: cb519ebd639ecdab512cb17e13740c06e5c19a4d
Parents: 9ad3281
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 12:09:22 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  1 +
 query_execution/Learner.cpp                |  4 +-
 query_execution/tests/Learner_unittest.cpp | 82 ++++++++++++++++++++++++-
 3 files changed, 82 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cb519ebd/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 3904185..ef1ce99 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -274,6 +274,7 @@ target_link_libraries(Learner_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_Learner
+                      quickstep_queryexecution_QueryExecutionMessages_proto                      
                       quickstep_queryoptimizer_QueryHandle)
 add_test(Learner_unittest Learner_unittest) 
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cb519ebd/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 0f17e7a..c7a7064 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -51,10 +51,11 @@ void Learner::addCompletionFeedback(
       workorder_completion_proto.execution_time_in_microseconds(),
       workorder_completion_proto.operator_index());
 
-  // updateProbability();
   if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
     updateFeedbackFromQueriesInPriorityLevel(priority_level);
   }
+  updateProbabilitiesForQueriesInPriorityLevel(priority_level, query_id);
+  updateProbabilitiesOfAllPriorityLevels();
 }
 
 void Learner::updateProbabilitiesForQueriesInPriorityLevel(
@@ -67,7 +68,6 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     return;
   } else if (execution_stats_[priority_level].size() == 1u) {
     DCHECK(current_probabilities_[priority_level] != nullptr);
-    DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
     // As we want the probability of the lone query in this priority level as
     // 1, we set the numerator same as denominator.
     const std::size_t numerator =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/cb519ebd/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index a1a144d..556c984 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -20,11 +20,26 @@
 #include "gtest/gtest.h"
 
 #include "query_execution/Learner.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_optimizer/QueryHandle.hpp"
 
 namespace quickstep {
 
-TEST(LearnerTest, AddAndRemoveQueryTest) {
+class LearnerTest : public ::testing::Test {
+ protected:
+  serialization::NormalWorkOrderCompletionMessage createMockCompletionMessage(
+      const std::size_t query_id, const std::size_t operator_id) {
+    serialization::NormalWorkOrderCompletionMessage mock_proto_message;
+    mock_proto_message.set_operator_index(operator_id);
+    mock_proto_message.set_query_id(query_id);
+    mock_proto_message.set_worker_thread_index(0);
+    mock_proto_message.set_execution_time_in_microseconds(10);
+
+    return mock_proto_message;
+  }
+};
+
+TEST_F(LearnerTest, AddAndRemoveQueryTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle;
   const std::size_t kPriorityLevel = 1;
@@ -53,7 +68,7 @@ TEST(LearnerTest, AddAndRemoveQueryTest) {
   EXPECT_FALSE(learner.hasActiveQueries());
 }
 
-TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle1, handle2;
   const std::size_t kPriorityLevel = 1;
@@ -85,7 +100,7 @@ TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 }
 
-TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
+TEST_F(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle1, handle2;
   const std::size_t kPriorityLevel1 = 1;
@@ -123,4 +138,65 @@ TEST(LearnerTest, MultipleQueriesDifferentPrioritiesAddRemoveTest) {
   EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
 }
 
+TEST_F(LearnerTest, AddCompletionFeedbackSamePriorityLevelTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel = 1;
+  handle1.reset(new QueryHandle(1, kPriorityLevel));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  learner.addQuery(*handle1);
+  serialization::NormalWorkOrderCompletionMessage completion_message =
+      createMockCompletionMessage(handle1->query_id(), 0);
+
+  learner.addCompletionFeedback(completion_message);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+  handle2.reset(new QueryHandle(2, kPriorityLevel));
+  learner.addQuery(*handle2);
+  completion_message = createMockCompletionMessage(handle2->query_id(), 0);
+  learner.addCompletionFeedback(completion_message);
+
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+}
+
+TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel1 = 1;
+  const std::size_t kPriorityLevel2 = 2;
+  handle1.reset(new QueryHandle(1, kPriorityLevel1));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  learner.addQuery(*handle1);
+
+  handle2.reset(new QueryHandle(2, kPriorityLevel2));
+  learner.addQuery(*handle2);
+
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+
+  const std::size_t kNumIterations = 10;
+  std::vector<QueryHandle*> handles;
+  handles.emplace_back(handle1.get());
+  handles.emplace_back(handle2.get());
+  for (std::size_t iter_num = 0; iter_num < kNumIterations; ++iter_num) {
+    for (std::size_t index = 0; index < handles.size(); ++index) {
+      EXPECT_TRUE(learner.hasActiveQueries());
+      serialization::NormalWorkOrderCompletionMessage completion_message =
+        createMockCompletionMessage(handles[index]->query_id(), 0);
+      learner.addCompletionFeedback(completion_message);
+      EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+      EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+      EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+    }
+  }
+}
 }  // namespace quickstep


[03/11] incubator-quickstep git commit: Added ExecutionStats class

Posted by hb...@apache.org.
Added ExecutionStats class

- To keep track of query execution statistics for a given query.
- The stats class organizes execution time on a per-operator basis.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/8e973b87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/8e973b87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/8e973b87

Branch: refs/heads/scheduler++
Commit: 8e973b873b1d3c96fc025845868b24db293f21c6
Parents: 714874c
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sun Jun 19 10:06:02 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt     |   6 ++
 query_execution/ExecutionStats.hpp | 177 ++++++++++++++++++++++++++++++++
 query_execution/PolicyEnforcer.hpp |   2 +-
 3 files changed, 184 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e973b87/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b031a44..fcd4f48 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -32,6 +32,7 @@ if (ENABLE_DISTRIBUTED)
   add_library(quickstep_queryexecution_BlockLocator BlockLocator.cpp BlockLocator.hpp)
 endif()
 add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitRequestMessage.hpp)
+add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
@@ -69,6 +70,9 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_utility_Macros
                         tmb)
 endif()
+target_link_libraries(quickstep_queryexecution_ExecutionStats
+                      glog
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_Foreman
                       ${GFLAGS_LIB_NAME} 
                       glog
@@ -91,6 +95,7 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
+                      quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -199,6 +204,7 @@ target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
 add_library(quickstep_queryexecution ../empty_src.cpp QueryExecutionModule.hpp)
 target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_AdmitRequestMessage
+                      quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_ForemanLite
                       quickstep_queryexecution_PolicyEnforcer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e973b87/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
new file mode 100644
index 0000000..f28f367
--- /dev/null
+++ b/query_execution/ExecutionStats.hpp
@@ -0,0 +1,177 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <cstdint>
+#include <deque>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+/**
+ * @brief Record the execution stats of a query.
+ **/
+class ExecutionStats {
+ public:
+  /**
+   * @brief Constructor
+   *
+   * @param max_entries The maximum number of entries we remember for each
+   *        operator.
+   **/
+  explicit ExecutionStats(const std::size_t max_entries)
+      : max_entries_(max_entries), cached_stats_(std::make_pair(0, 0)) {}
+
+  /**
+   * @brief Get the number of active operators in stats.
+   **/
+  const std::size_t getNumActiveOperators() const {
+    return active_operators_.size();
+  }
+
+  /**
+   * @brief Get the current stats.
+   *
+   * @note This function updates the cache, hence it can't be const. We are lazy
+   *       in updating the cache, instead of eagerly updating the cache upon
+   *       each update.
+   *
+   * @return A pair - 1st element is total time, 2nd element is total number of
+   *         WorkOrders for the whole query.
+   **/
+  std::pair<std::uint64_t, std::uint64_t> getCurrentStats() {
+    if (active_operators_.empty()) {
+      return cached_stats_;
+    } else {
+      std::pair<std::uint64_t, std::uint64_t> result = std::make_pair(0, 0);
+      for (auto it = active_operators_.begin(); it != active_operators_.end(); ++it) {
+        DCHECK(it->second.get() != nullptr);
+        std::pair<std::uint64_t, std::size_t> op_stats = it->second->getStats();
+        result.first += op_stats.first;
+        result.second += op_stats.second;
+      }
+      if (result.first == 0 || result.second == 0) {
+        // If one of the element in the pair is 0, use old result.
+        return cached_stats_;
+      } else if (result.first != 0 && result.second != 0) {
+        cached_stats_ = result;
+      }
+      return result;
+    }
+  }
+
+  /**
+   * @brief Add a new entry to stats.
+   *
+   * @param value The value to be added.
+   * @param operator_index The operator index which the value belongs to.
+   **/
+  void addEntry(std::size_t value, std::size_t operator_index) {
+    if (hasOperator(operator_index)) {
+      // This is not the first entry for the given operator.
+      active_operators_[operator_index]->addEntry(value);
+    } else {
+      // Create the OperatorStats object for this operator.
+      active_operators_[operator_index] =
+          std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
+    }
+  }
+
+  /**
+   * @brief Remove the operator with given index. This should be called only
+   *        when the given operator finishes its execution.
+   **/
+  void removeOperator(std::size_t operator_index) {
+    DCHECK(hasOperator(operator_index));
+    active_operators_.erase(operator_index);
+  }
+
+ private:
+  /**
+   * @brief Stats for an operator within the query.
+   *
+   * @note We remember only the last N entries for the operator.
+   **/
+  class OperatorStats {
+   public:
+    /**
+     * @brief Constructor.
+     *
+     * @param max_entries The maximum number of entries we remember. Typically
+     *        these are the last N (=max_entries) entries.
+     **/
+    explicit OperatorStats(const std::size_t max_entries) : max_entries_(max_entries) {
+      DCHECK_GE(max_entries, 0);
+    }
+
+    inline std::pair<std::uint64_t, std::size_t> getStats() const {
+      return std::make_pair(std::accumulate(times_.begin(), times_.end(), 0),
+                            times_.size());
+    }
+
+    inline void addEntry(std::uint64_t time_value) {
+      if (times_.size() == max_entries_) {
+        times_.pop_front();
+      }
+      times_.push_back(time_value);
+      DCHECK_LE(times_.size(), max_entries_);
+    }
+
+   private:
+    const std::size_t max_entries_;
+    std::deque<std::uint64_t> times_;
+
+    DISALLOW_COPY_AND_ASSIGN(OperatorStats);
+  };
+
+  /**
+   * @brief Check if the operator with given index is present in the stats.
+   **/
+  inline bool hasOperator(const std::size_t operator_index) const {
+    return active_operators_.find(operator_index) != active_operators_.end();
+  }
+
+  const std::size_t max_entries_;
+
+  std::unordered_map<std::size_t, std::unique_ptr<OperatorStats>>
+      active_operators_;
+
+  // Cached stats for the whole query.
+  std::pair<std::uint64_t, std::uint64_t> cached_stats_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExecutionStats);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_EXECUTION_STATS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8e973b87/query_execution/PolicyEnforcer.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.hpp b/query_execution/PolicyEnforcer.hpp
index 470ff2a..b7f5735 100644
--- a/query_execution/PolicyEnforcer.hpp
+++ b/query_execution/PolicyEnforcer.hpp
@@ -214,4 +214,4 @@ class PolicyEnforcer {
 
 }  // namespace quickstep
 
-#endif  // QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_HPP_
+#endif  // QUICKSTEP_QUERY_EXECUTION_POLICY_ENFORCER_HPP_


[11/11] incubator-quickstep git commit: Added GFLAG to learner.

Posted by hb...@apache.org.
Added GFLAG to learner.

- To control the number of number of work order execution statistics
  that are maintained in the Learner, for a given query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/9ad3281f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/9ad3281f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/9ad3281f

Branch: refs/heads/scheduler++
Commit: 9ad3281f2c358a4232413792565e573cfa358c9f
Parents: 35bf47e
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 11:39:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.cpp                | 25 +++++++-
 query_execution/Learner.hpp                | 76 ++++++++++++-------------
 query_execution/tests/Learner_unittest.cpp | 17 +++---
 3 files changed, 69 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ad3281f/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 38a773b..0f17e7a 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -34,6 +34,11 @@
 
 namespace quickstep {
 
+DEFINE_uint64(max_past_entries_learner,
+              10,
+              "The maximum number of past WorkOrder execution statistics"
+              " entries for a query");
+
 void Learner::addCompletionFeedback(
     const serialization::NormalWorkOrderCompletionMessage
         &workorder_completion_proto) {
@@ -90,8 +95,7 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
   }
 }
 
-void Learner::updateProbabilitiesOfAllPriorityLevels(
-    const std::size_t priority_level) {
+void Learner::updateProbabilitiesOfAllPriorityLevels() {
   if (!hasFeedbackFromAllPriorityLevels() ||
       has_feedback_from_all_queries_.empty()) {
     // Either we don't have enough feedback messages from all the priority
@@ -114,7 +118,7 @@ void Learner::updateProbabilitiesOfAllPriorityLevels(
       total_time_curr_level += mean_workorder_entry.second;
     }
     const std::size_t num_queries_in_priority_level =
-        execution_stats_[priority_level].size();
+        execution_stats_[curr_priority_level].size();
     DCHECK_GT(num_queries_in_priority_level, 0u);
     predicted_time_for_level[curr_priority_level] =
         total_time_curr_level / num_queries_in_priority_level;
@@ -195,4 +199,19 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   }
 }
 
+void Learner::initializeQuery(const QueryHandle &query_handle) {
+  const std::size_t priority_level = query_handle.query_priority();
+  const std::size_t query_id = query_handle.query_id();
+  DCHECK(isPriorityLevelPresent(priority_level));
+  query_id_to_priority_lookup_[query_id] = priority_level;
+  // TODO(harshad) - Create a gflag for max_past_entries_learner.
+  execution_stats_[priority_level].emplace_back(
+      query_id,
+      std::unique_ptr<ExecutionStats>(
+          new ExecutionStats(FLAGS_max_past_entries_learner)));
+  // As we are initializing the query, we obviously haven't gotten any
+  // feedback message for this query. Hence mark the following field as false.
+  has_feedback_from_all_queries_[priority_level] = false;
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ad3281f/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index fb0e4cb..073b693 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -30,15 +30,10 @@
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
-#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 namespace quickstep {
 
-/*DECLARE_int32(max_past_entries_learner,
-              10,
-              "The maximum number of past WorkOrder execution statistics"
-              " entries for a query");*/
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -94,14 +89,6 @@ class Learner {
     }
   }
 
-  void updateProbabilitiesForQueriesInPriorityLevel(
-      const std::size_t priority_level, const std::size_t query_id);
-
-  // TODO(harshad) - Cache internal results from previous invocation of this
-  // function and reuse them. There's a lot of redundancy in computations
-  // at this point.
-  void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
-
   inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
@@ -122,6 +109,29 @@ class Learner {
 
  private:
   /**
+   * @brief Update the probabilities for queries in the given priority level.
+   *
+   * @note This function is called after the learner receives a completion
+   *       feedback message from a given query.
+   *
+   * @param priority_level The priority level.
+   * @param query_id The ID of the query for which a completion feedback message
+   *        has been received.
+   *
+   **/
+  void updateProbabilitiesForQueriesInPriorityLevel(
+      const std::size_t priority_level, const std::size_t query_id);
+
+  /**
+   * @brief Update the probabilities of all the priority levels.
+   *
+   * TODO(harshad) - Cache internal results from previous invocation of this
+   * function and reuse them. There's a lot of redundancy in computations
+   * at this point.
+   **/
+  void updateProbabilitiesOfAllPriorityLevels();
+
+  /**
    * @brief Initialize the default probabilities for the queries.
    **/
   void initializeDefaultProbabilitiesForAllQueries();
@@ -135,18 +145,14 @@ class Learner {
    * @brief Initialize the data structures for a given priority level, if none
    *        exist. If there are already data structures for the given priority
    *        level, do nothing.
+   *
+   * @note This function should be followed by a relearn() call, to insert this
+   *       priority levels in probabilities_of_priority_levels_.
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
-      // Calculate the default probability for the priority level here and use
-      // it instead of 0.5 here.
-      // TODO(harshad) - Correct this.
-      /*const float new_denominator =
-          probabilities_of_priority_levels_[priority_level]->getDenominator();
-      probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
-          priority_level, priority_level, new_denominator);*/
       execution_stats_[priority_level];
     }
   }
@@ -169,6 +175,10 @@ class Learner {
 
   /**
    * @brief Check if the Learner has presence of the given priority level.
+   *
+   * @param priority_level The priority level.
+   *
+   * @return True if present, false otherwise.
    **/
   inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
     DCHECK_EQ((current_probabilities_.find(priority_level) ==
@@ -178,7 +188,7 @@ class Learner {
   }
 
   /**
-   * @brief Check if the query is present.
+   * @brief Check if the query is present in local data structures.
    **/
   inline bool isQueryPresent(const std::size_t query_id) const {
     return query_id_to_priority_lookup_.find(query_id) !=
@@ -190,20 +200,7 @@ class Learner {
    *
    * @param query_handle The query handle for the new query.
    **/
-  void initializeQuery(const QueryHandle &query_handle) {
-    const std::size_t priority_level = query_handle.query_priority();
-    const std::size_t query_id = query_handle.query_id();
-    DCHECK(isPriorityLevelPresent(priority_level));
-    query_id_to_priority_lookup_[query_id] = priority_level;
-    execution_stats_[priority_level].emplace_back(
-        query_id,
-        std::unique_ptr<ExecutionStats>(
-            // new ExecutionStats(FLAGS_max_past_entries_learner)));
-            new ExecutionStats(10)));
-    // As we are initializing the query, we obviously haven't gotten any
-    // feedback message for this query. Hence mark the following field as false.
-    has_feedback_from_all_queries_[priority_level] = false;
-  }
+  void initializeQuery(const QueryHandle &query_handle);
 
   /**
    * @brief Get the execution stats object for the given query.
@@ -222,9 +219,10 @@ class Learner {
   }
 
   /**
-   * @brief This function works well when the query and priority level exists
-   *        in the data structures.
+   * @breif Get a mutable iterator to the execution stats for a given query.
    *
+   * @note This function works well when the query and priority level exists
+   *       in the data structures.
    **/
   inline std::vector<
       std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
@@ -244,6 +242,9 @@ class Learner {
     return stats_iter;
   }
 
+  /**
+   * @brief Get a query's priority level given its ID.
+   **/
   inline const std::size_t getQueryPriority(const std::size_t query_id) const {
     const auto it = query_id_to_priority_lookup_.find(query_id);
     DCHECK(it != query_id_to_priority_lookup_.end());
@@ -366,7 +367,6 @@ class Learner {
 
   // Key = priority level. Value = A boolean that indicates if we have received
   // feedback from all the queries in the given priority level.
-  // TODO(harshad) - Invalidate the cache whenever needed.
   std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
   DISALLOW_COPY_AND_ASSIGN(Learner);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/9ad3281f/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 864bb22..a1a144d 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -27,28 +27,29 @@ namespace quickstep {
 TEST(LearnerTest, AddAndRemoveQueryTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle;
-  const std::size_t kPriorityLevel1 = 1;
-  handle.reset(new QueryHandle(1, kPriorityLevel1));
+  const std::size_t kPriorityLevel = 1;
+  handle.reset(new QueryHandle(1, kPriorityLevel));
 
   EXPECT_FALSE(learner.hasActiveQueries());
   learner.addQuery(*handle);
   EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_TRUE(learner.hasActiveQueries());
+
   learner.removeQuery(handle->query_id());
   EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_FALSE(learner.hasActiveQueries());
 
-  const std::size_t kPriorityLevel2 = 1;
-  handle.reset(new QueryHandle(1, kPriorityLevel2));
+  handle.reset(new QueryHandle(2, kPriorityLevel));
   learner.addQuery(*handle);
   EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_TRUE(learner.hasActiveQueries());
+
   learner.removeQuery(handle->query_id());
   EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
-  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
   EXPECT_FALSE(learner.hasActiveQueries());
 }
 


[10/11] incubator-quickstep git commit: API to find the highest priority level in the learner

Posted by hb...@apache.org.
API to find the highest priority level in the learner

- Unit tests to test the feature.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5514e009
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5514e009
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5514e009

Branch: refs/heads/scheduler++
Commit: 5514e0090c1d753424601bc2f2cb165984b0766e
Parents: cb519eb
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 14:45:09 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             | 12 +++--
 query_execution/Learner.cpp                | 33 +++++++++++++
 query_execution/Learner.hpp                | 30 +++++++-----
 query_execution/ProbabilityStore.hpp       | 10 ++--
 query_execution/QueryExecutionTypedefs.hpp |  2 +
 query_execution/tests/Learner_unittest.cpp | 64 +++++++++++++++++++++++++
 6 files changed, 130 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index ef1ce99..4639617 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -76,7 +76,7 @@ target_link_libraries(quickstep_queryexecution_ExecutionStats
                       glog
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_Foreman
-                      ${GFLAGS_LIB_NAME} 
+                      ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_queryexecution_AdmitRequestMessage
                       quickstep_queryexecution_ForemanLite
@@ -99,7 +99,8 @@ target_link_libraries(quickstep_queryexecution_Learner
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_ProbabilityStore
-                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
@@ -274,9 +275,10 @@ target_link_libraries(Learner_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_Learner
-                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryexecution_QueryExecutionMessages_proto
+                      quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryoptimizer_QueryHandle)
-add_test(Learner_unittest Learner_unittest) 
+add_test(Learner_unittest Learner_unittest)
 
 add_executable(ProbabilityStore_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
@@ -284,7 +286,7 @@ target_link_libraries(ProbabilityStore_unittest
                       gtest
                       gtest_main
                       quickstep_queryexecution_ProbabilityStore)
-add_test(ProbabilityStore_unittest ProbabilityStore_unittest) 
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest)
 
 add_executable(QueryManager_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index c7a7064..720df33 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,6 +26,7 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
@@ -39,6 +40,11 @@ DEFINE_uint64(max_past_entries_learner,
               "The maximum number of past WorkOrder execution statistics"
               " entries for a query");
 
+Learner::Learner()
+    : highest_priority_level_(kInvalidPriorityLevel) {
+  probabilities_of_priority_levels_.reset(new ProbabilityStore());
+}
+
 void Learner::addCompletionFeedback(
     const serialization::NormalWorkOrderCompletionMessage
         &workorder_completion_proto) {
@@ -214,4 +220,31 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
   has_feedback_from_all_queries_[priority_level] = false;
 }
 
+void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (execution_stats_[priority_level].empty()) {
+    execution_stats_.erase(priority_level);
+    current_probabilities_.erase(priority_level);
+    probabilities_of_priority_levels_->removeObject(priority_level);
+    has_feedback_from_all_queries_.erase(priority_level);
+    if (hasActiveQueries()) {
+      if (static_cast<int>(priority_level) == highest_priority_level_) {
+        // The priority level to be removed is the highest priority level.
+        std::size_t new_highest_priority_level = 0;
+        // Find the new highest priority level.
+        for (auto priority_level_it = execution_stats_.cbegin();
+             priority_level_it != execution_stats_.cend();
+             ++priority_level_it) {
+          if (priority_level_it->first > new_highest_priority_level) {
+            new_highest_priority_level = priority_level_it->first;
+          }
+        }
+        highest_priority_level_ = static_cast<int>(new_highest_priority_level);
+      }
+    } else {
+      highest_priority_level_ = kInvalidPriorityLevel;
+    }
+  }
+}
+
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 073b693..f99b1c6 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -43,9 +43,7 @@ class Learner {
   /**
    * @brief Constructor.
    **/
-  Learner() {
-    probabilities_of_priority_levels_.reset(new ProbabilityStore());
-  }
+  Learner();
 
   void addCompletionFeedback(
       const serialization::NormalWorkOrderCompletionMessage
@@ -107,6 +105,16 @@ class Learner {
     return query_id_to_priority_lookup_.size();
   }
 
+  /**
+   * @brief Get the highest priority level among the active queries.
+   *
+   * @return The highest priority level. If the system is empty it returns
+   *         kInvalidPriorityLevel.
+   **/
+  inline const int getHighestPriorityLevel() const {
+    return highest_priority_level_;
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -151,9 +159,13 @@ class Learner {
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
+    CHECK_GT(priority_level, 0) << "Priority level should be non-zero";
     if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
       execution_stats_[priority_level];
+      if (static_cast<int>(priority_level) > highest_priority_level_) {
+        highest_priority_level_ = priority_level;
+      }
     }
   }
 
@@ -163,15 +175,7 @@ class Learner {
    *
    * @param priority_level The priority level.
    **/
-  inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
-    DCHECK(isPriorityLevelPresent(priority_level));
-    if (execution_stats_[priority_level].empty()) {
-      execution_stats_.erase(priority_level);
-      current_probabilities_.erase(priority_level);
-      probabilities_of_priority_levels_->removeObject(priority_level);
-      has_feedback_from_all_queries_.erase(priority_level);
-    }
-  }
+  void checkAndRemovePriorityLevel(const std::size_t priority_level);
 
   /**
    * @brief Check if the Learner has presence of the given priority level.
@@ -369,6 +373,8 @@ class Learner {
   // feedback from all the queries in the given priority level.
   std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
+  int highest_priority_level_;
+
   DISALLOW_COPY_AND_ASSIGN(Learner);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 347df89..233dd2e 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -212,9 +212,10 @@ class ProbabilityStore {
                                              cumulative_probability);
       cumulative_probability += p.second.second;
     }
-    // Adjust the last cumulative probability manually to 1.0, so that
+    DCHECK(!cumulative_probabilities_.empty());
+    // Adjust the last cumulative probability manually to 1, so that
     // floating addition related rounding issues are ignored.
-    cumulative_probabilities_.back().updateProbability(1.0);
+    cumulative_probabilities_.back().updateProbability(1);
   }
 
   /**
@@ -233,7 +234,9 @@ class ProbabilityStore {
    public:
     ProbabilityInfo(const std::size_t property, const float probability)
         : property_(property), probability_(probability) {
-      DCHECK_LE(probability, 1.0);
+      // As GLOG doesn't provide DEBUG only checks for less than equal
+      // comparison for floats, we can't ensure that probability is less than
+      // 1.0.
     }
 
     ProbabilityInfo(const ProbabilityInfo &other) = default;
@@ -241,7 +244,6 @@ class ProbabilityStore {
     ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
 
     void updateProbability(const float new_probability) {
-      DCHECK_LE(new_probability, 1.0);
       probability_ = new_probability;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index 9d1060f..e13f3e0 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -43,6 +43,8 @@ typedef tmb::TaggedMessage TaggedMessage;
 typedef tmb::client_id client_id;
 typedef tmb::message_type_id message_type_id;
 
+const int kInvalidPriorityLevel = -1;
+
 using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'C',
                                      'l',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5514e009/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 556c984..107576f 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -15,12 +15,18 @@
  *   limitations under the License.
  **/
 
+#include <algorithm>
+#include <chrono>
+#include <cstddef>
 #include <memory>
+#include <random>
+#include <vector>
 
 #include "gtest/gtest.h"
 
 #include "query_execution/Learner.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 
 namespace quickstep {
@@ -199,4 +205,62 @@ TEST_F(LearnerTest, AddCompletionFeedbackMultiplePriorityLevelsTest) {
     }
   }
 }
+
+TEST_F(LearnerTest, HighestPriorityLevelTest) {
+  std::vector<std::size_t> priorities_insertion_order;
+  std::vector<std::size_t> priorities_removal_order;
+  const std::size_t kNumPrioritiesToTest = 20;
+  for (std::size_t priority_num = 1;
+       priority_num <= kNumPrioritiesToTest;
+       ++priority_num) {
+    // Note: Priority level should be non-zero, hence we begin from 1.
+    priorities_insertion_order.emplace_back(priority_num);
+    priorities_removal_order.emplace_back(priority_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g(rd());
+
+  std::shuffle(priorities_insertion_order.begin(),
+               priorities_insertion_order.end(),
+               g);
+
+  std::shuffle(priorities_removal_order.begin(),
+               priorities_removal_order.end(),
+               g);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+
+  std::unique_ptr<QueryHandle> handle;
+  // First insert the queries in the order of priorities as defined by
+  // priorities_insertion_order.
+  for (auto it = priorities_insertion_order.begin();
+       it != priorities_insertion_order.end();
+       ++it) {
+    // Note that the query ID is kept the same as priority level for simplicity.
+    handle.reset(new QueryHandle(*it, *it));
+    learner.addQuery(*handle);
+    const std::size_t max_priority_so_far =
+        *(std::max_element(priorities_insertion_order.begin(), it + 1));
+    EXPECT_EQ(static_cast<int>(max_priority_so_far),
+              learner.getHighestPriorityLevel());
+  }
+  // Now remove the queries in the order of priorities as defined by
+  // priorities_removal_order.
+  for (auto it = priorities_removal_order.begin();
+       it != priorities_removal_order.end();
+       ++it) {
+    // Recall that the query ID is the same as priority level.
+    const std::size_t max_priority_so_far =
+        *(std::max_element(it, priorities_removal_order.end()));
+    EXPECT_EQ(static_cast<int>(max_priority_so_far),
+              learner.getHighestPriorityLevel());
+    learner.removeQuery(*it);
+  }
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
+}
+
 }  // namespace quickstep


[04/11] incubator-quickstep git commit: Get number of active queries (total and by priority level)

Posted by hb...@apache.org.
Get number of active queries (total and by priority level)

- Unit tests to check the feature.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/6183e393
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/6183e393
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/6183e393

Branch: refs/heads/scheduler++
Commit: 6183e3933f5d90ffb23cffcefd1719e79a001727
Parents: d5c6c9d
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Fri Jun 24 10:42:56 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500

----------------------------------------------------------------------
 query_execution/Learner.hpp                | 16 +++++++-
 query_execution/tests/Learner_unittest.cpp | 54 ++++++++++++++++++++-----
 2 files changed, 58 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6183e393/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 9d51877..fb0e4cb 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -102,10 +102,24 @@ class Learner {
   // at this point.
   void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
 
-  inline const std::size_t hasActiveQueries() const {
+  inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
 
+  inline const std::size_t getNumActiveQueriesInPriorityLevel(
+      const std::size_t priority_level) const {
+    const auto it = execution_stats_.find(priority_level);
+    if (it != execution_stats_.end()) {
+      return it->second.size();
+    } else {
+      return 0;
+    }
+  }
+
+  inline const std::size_t getTotalNumActiveQueries() const {
+    return query_id_to_priority_lookup_.size();
+  }
+
  private:
   /**
    * @brief Initialize the default probabilities for the queries.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/6183e393/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index cab241a..74353f0 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -24,32 +24,64 @@
 
 namespace quickstep {
 
-TEST(LearnerTest, AddQueryTest) {
+TEST(LearnerTest, AddAndRemoveQueryTest) {
   Learner learner;
   std::unique_ptr<QueryHandle> handle;
-  handle.reset(new QueryHandle(1, 1));
+  const std::size_t kPriorityLevel1 = 1;
+  handle.reset(new QueryHandle(1, kPriorityLevel1));
 
   EXPECT_FALSE(learner.hasActiveQueries());
   learner.addQuery(*handle);
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
   EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel1));
+  EXPECT_FALSE(learner.hasActiveQueries());
+
+  const std::size_t kPriorityLevel2 = 1;
+  handle.reset(new QueryHandle(1, kPriorityLevel2));
+  learner.addQuery(*handle);
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel2));
+  EXPECT_FALSE(learner.hasActiveQueries());
 }
 
-TEST(LearnerTest, RemoveQueryTest) {
+TEST(LearnerTest, MultipleQueriesSamePriorityAddRemoveTest) {
   Learner learner;
-  std::unique_ptr<QueryHandle> handle;
-  handle.reset(new QueryHandle(1, 1));
+  std::unique_ptr<QueryHandle> handle1, handle2;
+  const std::size_t kPriorityLevel = 1;
+  handle1.reset(new QueryHandle(1, kPriorityLevel));
+  handle2.reset(new QueryHandle(2, kPriorityLevel));
 
   EXPECT_FALSE(learner.hasActiveQueries());
-  learner.addQuery(*handle);
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+  learner.addQuery(*handle1);
   EXPECT_TRUE(learner.hasActiveQueries());
-  learner.removeQuery(handle->query_id());
-  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+  learner.addQuery(*handle2);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  EXPECT_EQ(2u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(2u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 
-  handle.reset(new QueryHandle(2, 1));
-  learner.addQuery(*handle);
+  learner.removeQuery(handle1->query_id());
   EXPECT_TRUE(learner.hasActiveQueries());
-  learner.removeQuery(handle->query_id());
+  EXPECT_EQ(1u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(1u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
+
+  learner.removeQuery(handle2->query_id());
+
   EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(0u, learner.getTotalNumActiveQueries());
+  EXPECT_EQ(0u, learner.getNumActiveQueriesInPriorityLevel(kPriorityLevel));
 }
 
 }  // namespace quickstep


[05/11] incubator-quickstep git commit: Created Learner class.

Posted by hb...@apache.org.
Created Learner class.

- Learner keeps track of statistics of concurrent queries
- It maintains the probabilities for individual queries as well as the
  priority levels in the system.
- Changes in ProbabilityStore class including addition of numerator,
  denominator and more unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/75ec7f05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/75ec7f05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/75ec7f05

Branch: refs/heads/scheduler++
Commit: 75ec7f0539285286b73f14a0d774bc76c8bfc5f5
Parents: 1b24694
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 15:54:25 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  11 +
 query_execution/ExecutionStats.hpp              |  18 +
 query_execution/Learner.cpp                     | 195 ++++++++++
 query_execution/Learner.hpp                     | 352 +++++++++++++++++++
 query_execution/PolicyEnforcer.cpp              |   2 +
 query_execution/ProbabilityStore.hpp            | 148 ++++++--
 .../tests/ProbabilityStore_unittest.cpp         |  45 ++-
 7 files changed, 728 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 18ae0da..cb0f815 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -35,6 +35,7 @@ add_library(quickstep_queryexecution_AdmitRequestMessage ../empty_src.cpp AdmitR
 add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionStats.hpp)
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
+add_library(quickstep_queryexecution_Learner Learner.cpp Learner.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
 add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
@@ -93,11 +94,20 @@ target_link_libraries(quickstep_queryexecution_ForemanLite
                       quickstep_threading_Thread
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_queryexecution_Learner
+                      ${GFLAGS_LIB_NAME}
+                      glog
+                      quickstep_queryexecution_ExecutionStats
+                      quickstep_queryexecution_ProbabilityStore
+                      quickstep_queryexecution_QueryExecutionMessages_proto                      
+                      quickstep_queryoptimizer_QueryHandle
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_Learner
                       quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -212,6 +222,7 @@ target_link_libraries(quickstep_queryexecution
                       quickstep_queryexecution_ExecutionStats
                       quickstep_queryexecution_Foreman
                       quickstep_queryexecution_ForemanLite
+                      quickstep_queryexecution_Learner
                       quickstep_queryexecution_PolicyEnforcer
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryContext_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index f28f367..769c7a4 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,6 +58,20 @@ class ExecutionStats {
   }
 
   /**
+   * @brief Check if there are any stats present.
+   **/
+  inline bool hasStats() const {
+    for (auto it = active_operators_.begin();
+         it != active_operators_.end();
+         ++it) {
+      if (!it->second->hasStatsForOperator()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
    * @brief Get the current stats.
    *
    * @note This function updates the cache, hence it can't be const. We are lazy
@@ -145,6 +159,10 @@ class ExecutionStats {
       DCHECK_LE(times_.size(), max_entries_);
     }
 
+    inline bool hasStatsForOperator() const {
+      return !times_.empty();
+    }
+
    private:
     const std::size_t max_entries_;
     std::deque<std::uint64_t> times_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
new file mode 100644
index 0000000..72c68f0
--- /dev/null
+++ b/query_execution/Learner.cpp
@@ -0,0 +1,195 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_execution/Learner.hpp"
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+void Learner::addCompletionFeedback(
+    const serialization::NormalWorkOrderCompletionMessage
+        &workorder_completion_proto) {
+  const std::size_t query_id = workorder_completion_proto.query_id();
+  DCHECK(isQueryPresent(query_id));
+  const std::size_t priority_level = getQueryPriority(query_id);
+  ExecutionStats *execution_stats = getExecutionStats(query_id);
+  DCHECK(execution_stats != nullptr);
+  execution_stats->addEntry(
+      workorder_completion_proto.execution_time_in_microseconds(),
+      workorder_completion_proto.operator_index());
+
+  // updateProbability();
+  if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+    updateFeedbackFromQueriesInPriorityLevel(priority_level);
+  }
+}
+
+void Learner::updateProbabilitiesForQueriesInPriorityLevel(
+    const std::size_t priority_level, const std::size_t query_id) {
+  DCHECK(isPriorityLevelPresent(priority_level));
+  if (execution_stats_[priority_level].empty()) {
+    LOG(INFO) << "Updating probabilities for query ID: " << query_id
+              << " and priority level: " << priority_level
+              << " that has no queries";
+    return;
+  } else if (execution_stats_[priority_level].size() == 1u) {
+    DCHECK(current_probabilities_[priority_level] != nullptr);
+    DCHECK(current_probabilities_[priority_level]->getNumObjects() == 1u);
+    // As we want the probability of the lone query in this priority level as
+    // 1, we set the numerator same as denominator.
+    const std::size_t numerator =
+        current_probabilities_[priority_level]->getDenominator();
+    current_probabilities_[priority_level]->addOrUpdateObject(query_id,
+                                                              numerator);
+    return;
+  }
+  // Else, there are more than one queries for the given priority level.
+  std::unordered_map<std::size_t, std::size_t>
+      mean_workorders_per_query =
+          getMeanWorkOrderTimesForQueriesInPriorityLevel(priority_level);
+  const float denominator = calculateDenominator(mean_workorders_per_query);
+  if (denominator != 0) {
+    // Update the numerator for the given query and denominator for all the
+    // queries.
+    DCHECK(mean_workorders_per_query.find(query_id) !=
+           mean_workorders_per_query.end());
+    current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+        query_id, mean_workorders_per_query[query_id], denominator);
+  } else {
+    // At least one of the queries has predicted time for next work order as 0.
+    // In such a case, we don't update the probabilities and continue to use
+    // the older probabilities.
+  }
+}
+
+void Learner::updateProbabilitiesOfAllPriorityLevels(
+    const std::size_t priority_level) {
+  if (!hasFeedbackFromAllPriorityLevels() ||
+      has_feedback_from_all_queries_.empty()) {
+    // Either we don't have enough feedback messages from all the priority
+    // levels OR there are no active queries in the system.
+    return;
+  }
+  // Compute the predicted work order execution times for all the level.
+  std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
+  std::size_t sum_active_priorities = 0;
+  for (auto priority_iter : has_feedback_from_all_queries_) {
+    std::size_t total_time_curr_level = 0;
+    const std::size_t curr_priority_level = priority_iter.first;
+    sum_active_priorities += curr_priority_level;
+    // For each query, find its predicted work order execution time.
+    const std::unordered_map<std::size_t, std::size_t>
+        mean_workorders_all_queries_curr_level =
+            getMeanWorkOrderTimesForQueriesInPriorityLevel(
+                curr_priority_level);
+    for (auto mean_workorder_entry : mean_workorders_all_queries_curr_level) {
+      total_time_curr_level += mean_workorder_entry.second;
+    }
+    const std::size_t num_queries_in_priority_level =
+        execution_stats_[priority_level].size();
+    DCHECK_GT(num_queries_in_priority_level, 0u);
+    predicted_time_for_level[curr_priority_level] =
+        total_time_curr_level / num_queries_in_priority_level;
+  }
+  DCHECK_GT(sum_active_priorities, 0u);
+  // Now compute the allowable number of work orders for each priority level
+  // that can be executed given a unit total time.
+  // Key = priority level, value = the # of WO mentioned above.
+  std::unordered_map<std::size_t, float> num_workorders_for_level;
+  float total_num_workorders = 0;
+  for (auto predicted_time_iter : predicted_time_for_level) {
+    const std::size_t curr_priority_level = predicted_time_iter.first;
+    const std::size_t num_workorders_for_curr_level =
+        (predicted_time_iter.second == 0)
+            ? 0
+            : static_cast<float>(curr_priority_level) /
+                  sum_active_priorities /
+                  static_cast<float>(predicted_time_iter.second);
+    num_workorders_for_level[curr_priority_level] = num_workorders_for_curr_level;
+    total_num_workorders += num_workorders_for_curr_level;
+  }
+  if (total_num_workorders == 0) {
+    // No priority level can be selected at this point.
+    return;
+  }
+  // Finally, compute the probabilities.
+  std::vector<std::size_t> priority_levels;
+  std::vector<float> numerators;
+  for (auto num_workorders_iter : num_workorders_for_level) {
+    priority_levels.emplace_back(num_workorders_iter.first);
+    numerators.emplace_back(num_workorders_iter.second);
+  }
+  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+      priority_levels, numerators, total_num_workorders);
+}
+
+void Learner::initializeDefaultProbabilitiesForAllQueries() {
+  for (auto queries_same_priority_level_iter = execution_stats_.begin();
+       queries_same_priority_level_iter != execution_stats_.end();
+       ++queries_same_priority_level_iter) {
+    std::vector<std::size_t> query_ids;
+    const auto &queries_vector = queries_same_priority_level_iter->second;
+    DCHECK(!queries_vector.empty());
+    for (auto query_iter = queries_vector.cbegin();
+         query_iter != queries_vector.cend();
+         ++query_iter) {
+      query_ids.emplace_back(query_iter->first);
+    }
+    // Numerator for each query is 1.0
+    // The common denominator is number of queries with the given priority level.
+    std::vector<float> numerators(query_ids.size(), 1.0);
+    // Reset the probability store for this level.
+    const std::size_t curr_priority_level =
+        queries_same_priority_level_iter->first;
+    default_probabilities_[curr_priority_level].reset(new ProbabilityStore());
+    default_probabilities_[curr_priority_level]
+        ->addOrUpdateObjectsNewDenominator(
+            query_ids, numerators, query_ids.size());
+  }
+}
+
+void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
+  probabilities_of_priority_levels_.reset(new ProbabilityStore());
+  std::vector<std::size_t> priority_levels;
+  std::vector<float> numerators;
+  float sum_priority_levels = 0;
+  for (auto priority_iter = execution_stats_.cbegin();
+       priority_iter != execution_stats_.cend();
+       ++priority_iter) {
+    sum_priority_levels += priority_iter->second.size();
+    priority_levels.emplace_back(priority_iter->first);
+    numerators.emplace_back(priority_iter->first);
+  }
+  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+      priority_levels, numerators, sum_priority_levels);
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
new file mode 100644
index 0000000..64120a7
--- /dev/null
+++ b/query_execution/Learner.hpp
@@ -0,0 +1,352 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "query_execution/ExecutionStats.hpp"
+#include "query_execution/ProbabilityStore.hpp"
+#include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_optimizer/QueryHandle.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+namespace quickstep {
+
+DEFINE_int32(max_past_entries_learner,
+              10,
+              "The maximum number of past WorkOrder execution statistics"
+              " entries for a query");
+/** \addtogroup QueryExecution
+ *  @{
+ */
+
+class Learner {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  Learner() {
+  }
+
+  void addCompletionFeedback(
+      const serialization::NormalWorkOrderCompletionMessage
+          &workorder_completion_proto);
+
+  void addQuery(const QueryHandle &query_handle) {
+    initializePriorityLevelIfNotPresent(query_handle.query_priority());
+    initializeQuery(query_handle);
+    relearn();
+  }
+
+  void removeQuery(const std::size_t query_id) {
+    // Find the iterator to the query in execution_stats_.
+    DCHECK(isQueryPresent(query_id));
+    const std::size_t priority_level = getQueryPriority(query_id);
+    auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
+    execution_stats_[priority_level].erase(stats_iter_mutable);
+    current_probabilities_[priority_level]->removeObject(query_id);
+    checkAndRemovePriorityLevel(priority_level);
+    relearn();
+  }
+
+  void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+    ExecutionStats *stats = getExecutionStats(query_id);
+    DCHECK(stats != nullptr);
+    stats->removeOperator(operator_id);
+  }
+
+  void relearn() {
+    if (hasActiveQueries()) {
+      initializeDefaultProbabilitiesForAllQueries();
+      initializeDefaultProbabilitiesForPriorityLevels();
+    }
+  }
+
+  void updateProbabilitiesForQueriesInPriorityLevel(
+      const std::size_t priority_level, const std::size_t query_id);
+
+  // TODO(harshad) - Cache internal results from previous invocation of this
+  // function and reuse them. There's a lot of redundancy in computations
+  // at this point.
+  void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
+
+ private:
+  /**
+   * @brief Initialize the default probabilities for the queries.
+   **/
+  void initializeDefaultProbabilitiesForAllQueries();
+
+  /**
+   * @brief Initialize the default probabilities for the priority levels.
+   **/
+  void initializeDefaultProbabilitiesForPriorityLevels();
+
+  /**
+   * @brief Initialize the data structures for a given priority level, if none
+   *        exist. If there are already data structures for the given priority
+   *        level, do nothing.
+   **/
+  inline void initializePriorityLevelIfNotPresent(
+      const std::size_t priority_level) {
+    if (isPriorityLevelPresent(priority_level)) {
+      current_probabilities_[priority_level].reset(new ProbabilityStore());
+      // Calculate the default probability for the priority level here and use
+      // it instead of 0.5 here.
+      // TODO(harshad) - Correct this.
+      probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+      execution_stats_[priority_level];
+    }
+  }
+
+  /**
+   * @brief First check if the priority level needs to be removed from the local
+   *        data structures and remove if needed.
+   *
+   * @param priority_level The priority level.
+   **/
+  inline void checkAndRemovePriorityLevel(const std::size_t priority_level) {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    if (execution_stats_[priority_level].empty()) {
+      execution_stats_.erase(priority_level);
+      current_probabilities_.erase(priority_level);
+      probabilities_of_priority_levels_->removeObject(priority_level);
+      has_feedback_from_all_queries_.erase(priority_level);
+    }
+  }
+
+  /**
+   * @brief Check if the Learner has presence of the given priority level.
+   **/
+  inline bool isPriorityLevelPresent(const std::size_t priority_level) const {
+    DCHECK_EQ((current_probabilities_.find(priority_level) ==
+               current_probabilities_.end()),
+              execution_stats_.find(priority_level) == execution_stats_.end());
+    return (execution_stats_.find(priority_level) != execution_stats_.end());
+  }
+
+  /**
+   * @brief Check if the query is present.
+   **/
+  inline bool isQueryPresent(const std::size_t query_id) const {
+    return query_id_to_priority_lookup_.find(query_id) !=
+           query_id_to_priority_lookup_.end();
+  }
+
+  /**
+   * @brief Initialize all the data structures for a new query.
+   *
+   * @param query_handle The query handle for the new query.
+   **/
+  void initializeQuery(const QueryHandle &query_handle) {
+    const std::size_t priority_level = query_handle.query_priority();
+    const std::size_t query_id = query_handle.query_id();
+    DCHECK(isPriorityLevelPresent(priority_level));
+    query_id_to_priority_lookup_[query_id] = priority_level;
+    execution_stats_[priority_level].emplace_back(
+        query_id,
+        std::unique_ptr<ExecutionStats>(
+            new ExecutionStats(FLAGS_max_past_entries_learner)));
+    // As we are initializing the query, we obviously haven't gotten any
+    // feedback message for this query. Hence mark the following field as false.
+    has_feedback_from_all_queries_[priority_level] = false;
+  }
+
+  /**
+   * @brief Get the execution stats object for the given query.
+   *
+   * @return A pointer to the ExecutionStats for the query. If not present,
+   *         returns NULL.
+   **/
+  inline ExecutionStats* getExecutionStats(const std::size_t query_id) {
+    if (isQueryPresent(query_id)) {
+      const auto stats_iter = getExecutionStatsIterMutable(query_id);
+      DCHECK(stats_iter !=
+             std::end(execution_stats_[getQueryPriority(query_id)]));
+      return stats_iter->second.get();
+    }
+    return nullptr;
+  }
+
+  /**
+   * @brief This function works well when the query and priority level exists
+   *        in the data structures.
+   *
+   **/
+  inline std::vector<
+      std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>::const_iterator
+      getExecutionStatsIterMutable(const std::size_t query_id) {
+    const std::size_t priority_level = getQueryPriority(query_id);
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_[priority_level];
+    // The following line uses std::find_if to reach to the desired element
+    // in the stats_vector.
+    auto stats_iter = std::find_if(
+        stats_vector.begin(),
+        stats_vector.end(),
+        [&query_id](
+            const std::pair<std::size_t, std::unique_ptr<ExecutionStats>> &p) {
+          return p.first == query_id;
+        });
+    return stats_iter;
+  }
+
+  inline const std::size_t getQueryPriority(const std::size_t query_id) const {
+    const auto it = query_id_to_priority_lookup_.find(query_id);
+    DCHECK(it != query_id_to_priority_lookup_.end());
+    return it->second;
+  }
+
+  /**
+   * @brief Check if we have received at least one feedback message from all the
+   *        queries in the given priority level.
+   **/
+  inline bool hasFeedbackFromAllQueriesInPriorityLevel(
+      const std::size_t priority_level) const {
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_.at(priority_level);
+    for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+      DCHECK(stats_vector[i].second != nullptr);
+      if (!stats_vector[i].second->hasStats()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  inline void updateFeedbackFromQueriesInPriorityLevel(
+      const std::size_t priority_level) {
+    const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
+        &stats_vector = execution_stats_.at(priority_level);
+    for (std::size_t i = 0; i < stats_vector.size(); ++i) {
+      DCHECK(stats_vector[i].second != nullptr);
+      if (!stats_vector[i].second->hasStats()) {
+        // At least one query has no statistics so far.
+        return;
+      }
+    }
+    // All the queries have at least one execution statistic.
+    has_feedback_from_all_queries_[priority_level] = true;
+  }
+
+  inline const std::size_t hasActiveQueries() const {
+    return !query_id_to_priority_lookup_.empty();
+  }
+
+  /**
+   * @brief Get the mean work order execution times for all the queries in
+   *        a given priority level.
+   *
+   * @param priority_level The priority level.
+   *
+   * @return An unordered_map in which: Key = query ID.
+   *         Value = Mean time per work order for that query.
+   **/
+  inline std::unordered_map<std::size_t, std::size_t>
+  getMeanWorkOrderTimesForQueriesInPriorityLevel(
+      const std::size_t priority_level) {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    std::unordered_map<std::size_t, std::size_t> result;
+    for (auto it = execution_stats_[priority_level].begin();
+         it != execution_stats_[priority_level].end();
+         ++it) {
+      DCHECK(it->second.get() != nullptr);
+      auto query_stats = it->second->getCurrentStats();
+      result[it->first] =
+          query_stats.second == 0 ? 0 : query_stats.first / query_stats.second;
+    }
+    return result;
+  }
+
+  /**
+   * @param mean_workorder_per_query A vector of pairs in which:
+   *        1st element is mean time per work order
+   *        2nd element is the query ID.
+   *
+   * @note If any query has mean work order time as 0, we return 0 as the
+   *       denominator.
+   *
+   * @return The denominator to be used for probability calculations.
+   **/
+  inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
+                                        &mean_workorder_per_query) const {
+    float denominator = 0;
+    for (const auto &element : mean_workorder_per_query) {
+      if (element.second != 0) {
+        denominator += 1/static_cast<float>(element.second);
+      } else {
+        return 0;
+      }
+    }
+    return denominator;
+  }
+
+  inline bool hasFeedbackFromAllPriorityLevels() const {
+    for (auto feedback : has_feedback_from_all_queries_) {
+      if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Key = Priority level, value = A vector of pairs.
+  // Each pair:
+  // 1st element: Query ID.
+  // 2nd Element: Execution statistics for the query.
+  std::unordered_map<
+      std::size_t,
+      std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>>
+      execution_stats_;
+
+  // Key = query ID, value = priority level for the query ID.
+  std::unordered_map<std::size_t, std::size_t> query_id_to_priority_lookup_;
+
+  // Key = priority level, value = ProbabilityStore for the queries belonging to
+  // that priority level.
+  std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+      current_probabilities_;
+
+  // Key = priority level, value = ProbabilityStore for the queries belonging to
+  // that priority level.
+  std::unordered_map<std::size_t, std::unique_ptr<ProbabilityStore>>
+      default_probabilities_;
+
+  // ProbabilityStrore for probabilities mapped to the priority levels.
+  std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
+
+  // Key = priority level. Value = A boolean that indicates if we have received
+  // feedback from all the queries in the given priority level.
+  // TODO(harshad) - Invalidate the cache whenever needed.
+  std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+
+  DISALLOW_COPY_AND_ASSIGN(Learner);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_LEARNER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index db7206b..ff734ca 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/Learner.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
@@ -42,6 +43,7 @@ DEFINE_uint64(max_msgs_per_dispatch_round, 20, "Maximum number of messages that"
               " the workers.");
 
 bool PolicyEnforcer::admitQuery(QueryHandle *query_handle) {
+  Learner learner;
   if (admitted_queries_.size() < kMaxConcurrentQueries) {
     // Ok to admit the query.
     const std::size_t query_id = query_handle->query_id();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 8343d24..d31caa6 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -22,6 +22,7 @@
 #include <cstddef>
 #include <random>
 #include <unordered_map>
+#include <utility>
 #include <vector>
 
 #include "utility/Macros.hpp"
@@ -40,7 +41,7 @@ class ProbabilityStore {
    * @brief Constructor.
    **/
   ProbabilityStore()
-      : mt_(std::random_device()()) {}
+      : common_denominator_(1.0), mt_(std::random_device()()) {}
 
   /**
    * @brief Get the number of objects in the store.
@@ -50,6 +51,10 @@ class ProbabilityStore {
     return individual_probabilities_.size();
   }
 
+  inline const std::size_t getDenominator() const {
+    return common_denominator_;
+  }
+
   /**
    * @brief Add individual (not cumulative) probability for a given object.
    *
@@ -59,16 +64,48 @@ class ProbabilityStore {
    * @note This function may override previously written probability values.
    *
    * @param property The property of the given object.
-   * @param individual_probability The individual (not cumulative) probability
-   *        of the given object.
+   * @param numerator The numerator for the given object.
    **/
-  void addProbability(const std::size_t property,
-                      const float individual_probability) {
-    individual_probabilities_[property] = individual_probability;
+  void addOrUpdateObject(const std::size_t property,
+                         const float numerator) {
+    DCHECK_LE(numerator, common_denominator_);
+    // We should have the correct individual probability in
+    // individual_probabilities_ for the newly added object at this point.
+    // Because we rely on the probabilities for all the objects in
+    // updateCumulativeProbabilities().
+    individual_probabilities_[property] =
+        std::make_pair(numerator, numerator / common_denominator_);
     updateCumulativeProbabilities();
   }
 
   /**
+   * @brief Add individual (not cumulative) probability for a given object with
+   *        updated denominator.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param property The property of the given object.
+   * @param numerator The numerator for the given object.
+   * @param new_denominator The updated denominator for the store.
+   **/
+  void addOrUpdateObjectNewDenominator(const std::size_t property,
+                                       const float numerator,
+                                       const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_LE(numerator, new_denominator);
+    common_denominator_ = new_denominator;
+    // It is alright to not have the correct probability in
+    // individual_probabilities_ for the newly added object at this point.
+    // Because we compute the probabilities for all the objects in
+    // updateProbabilitiesNewDenominator().
+    individual_probabilities_[property] = std::make_pair(numerator, 0.0);
+    updateProbabilitiesNewDenominator();
+  }
+
+  /**
    * @brief Add individual (not cumulative) probabilities for given objects.
    *
    * @note This function leaves the cumulative probabilities in a consistent
@@ -77,30 +114,40 @@ class ProbabilityStore {
    * @note This function may override previously written probability values.
    *
    * @param properties A vector of properties to be added.
-   * @param individual_probabilities The individual (not cumulative)
-   *        probabilities of the given objects.
+   * @param numerators The numerators of the given objects.
    **/
-  void addProbabilities(const std::vector<std::size_t> &properties,
-                        const std::vector<float> &individual_probabilities) {
-    DCHECK_EQ(properties.size(), individual_probabilities.size());
+  void addOrUpdateObjects(const std::vector<std::size_t> &properties,
+                          const std::vector<float> &numerators) {
+    DCHECK_EQ(properties.size(), numerators.size());
     for (std::size_t i = 0; i < properties.size(); ++i) {
-      individual_probabilities_[properties[i]] = individual_probabilities[i];
+      DCHECK_LE(numerators[i], common_denominator_);
+      // We should have the correct individual probability in
+      // individual_probabilities_ for the newly added object at this point.
+      // Because we rely on the probabilities for all the objects in
+      // updateCumulativeProbabilities().
+      individual_probabilities_[properties[i]] =
+          std::make_pair(numerators[i], numerators[i] / common_denominator_);
     }
     updateCumulativeProbabilities();
   }
 
-  /**
-   * @brief Update  the probability of a given object to a new value.
-   *
-   * @param property The property of the object.
-   * @param new_individual_probability The new probability to be set.
-   **/
-  void updateProbability(const std::size_t property,
-                         const float new_individual_probability) {
-    auto it = individual_probabilities_.find(property);
-    DCHECK(it != individual_probabilities_.end());
-    it->second = new_individual_probability;
-    updateCumulativeProbabilities();
+  void addOrUpdateObjectsNewDenominator(
+      const std::vector<std::size_t> &properties,
+      const std::vector<float> &numerators,
+      const float new_denominator) {
+    CHECK_GT(new_denominator, 0u);
+    DCHECK_EQ(properties.size(), numerators.size());
+    common_denominator_ = new_denominator;
+    for (std::size_t i = 0; i < properties.size(); ++i) {
+      DCHECK_LE(numerators[i], common_denominator_);
+      // It is alright to not have the correct probability in
+      // individual_probabilities_ for the newly added object at this point.
+      // Because we compute the probabilities for all the objects in
+      // updateProbabilitiesNewDenominator().
+      individual_probabilities_[properties[i]] =
+          std::make_pair(numerators[i], 0.0);
+    }
+    updateProbabilitiesNewDenominator();
   }
 
   /**
@@ -109,10 +156,24 @@ class ProbabilityStore {
    * @param property The property of the object to be removed.
    **/
   void removeObject(const std::size_t property) {
-    auto it = individual_probabilities_.find(property);
-    DCHECK(it != individual_probabilities_.end());
-    individual_probabilities_.erase(it);
-    updateCumulativeProbabilities();
+    auto individual_it = individual_probabilities_.find(property);
+    DCHECK(individual_it != individual_probabilities_.end());
+    individual_probabilities_.erase(individual_it);
+    if (!individual_probabilities_.empty()) {
+      float new_denominator = 0;
+      for (auto it = individual_probabilities_.begin();
+           it != individual_probabilities_.end();
+           ++it) {
+        new_denominator += it->second.first;
+      }
+      CHECK_GT(new_denominator, 0);
+      common_denominator_ = new_denominator;
+      updateCumulativeProbabilities();
+    } else {
+      // In order to keep the store consistent, we should keep the sizes of
+      // individual_probabilities_ and cumulative_probabilities_ the same.
+      cumulative_probabilities_.clear();
+    }
   }
 
   /**
@@ -123,7 +184,7 @@ class ProbabilityStore {
   const float getIndividualProbability(const std::size_t property) const {
     const auto it = individual_probabilities_.find(property);
     DCHECK(it != individual_probabilities_.end());
-    return it->second;
+    return it->second.second;
   }
 
   /**
@@ -141,13 +202,13 @@ class ProbabilityStore {
       return;
     }
     float cumulative_probability = 0;
-    for (const auto property_probability_pair : individual_probabilities_) {
-      cumulative_probabilities_.emplace_back(property_probability_pair.first,
+    for (const auto p : individual_probabilities_) {
+      cumulative_probabilities_.emplace_back(p.first,
                                              cumulative_probability);
-      cumulative_probability += property_probability_pair.second;
+      cumulative_probability += p.second.second;
     }
-    // Adjust the last cumulative probability manually to 1.0, so that floating
-    // addition related rounding issues are ignored.
+    // Adjust the last cumulative probability manually to 1.0, so that
+    // floating addition related rounding issues are ignored.
     cumulative_probabilities_.back().updateProbability(1.0);
   }
 
@@ -208,9 +269,26 @@ class ProbabilityStore {
     return it->property_;
   }
 
-  std::unordered_map<std::size_t, float> individual_probabilities_;
+  inline void updateProbabilitiesNewDenominator() {
+    // First update the individual probabilities.
+    for (auto it = individual_probabilities_.begin();
+         it != individual_probabilities_.end();
+         ++it) {
+      DCHECK_LE(it->second.first, common_denominator_);
+      it->second.second = it->second.first / common_denominator_;
+    }
+    updateCumulativeProbabilities();
+  }
+
+  // Key = property of the object.
+  // Value = A pair ...
+  // 1st element: Numerator of the object.
+  // 2nd element: Individual probability of the object.
+  std::unordered_map<std::size_t, std::pair<float, float>> individual_probabilities_;
   std::vector<ProbabilityInfo> cumulative_probabilities_;
 
+  float common_denominator_;
+
   std::mt19937_64 mt_;
 
   DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/75ec7f05/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
index e624557..dcec1e5 100644
--- a/query_execution/tests/ProbabilityStore_unittest.cpp
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -28,14 +28,15 @@ TEST(ProbabilityStoreTest, CountTest) {
   ProbabilityStore store;
   EXPECT_EQ(0u, store.getNumObjects());
   const std::size_t kProperty = 0;
-  store.addProbability(kProperty, 0.5);
+  store.addOrUpdateObject(kProperty, 1);
   EXPECT_EQ(1u, store.getNumObjects());
   store.removeObject(kProperty);
   EXPECT_EQ(0u, store.getNumObjects());
 
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   EXPECT_EQ(objects.size(), store.getNumObjects());
 }
@@ -43,11 +44,12 @@ TEST(ProbabilityStoreTest, CountTest) {
 TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
-    EXPECT_EQ(probabilities[object_num],
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
               store.getIndividualProbability(objects[object_num]));
   }
 }
@@ -55,8 +57,9 @@ TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
 TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   ProbabilityStore store;
   std::vector<std::size_t> objects {3, 5, 7, 9};
-  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
-  store.addProbabilities(objects, probabilities);
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
 
   const std::size_t kNumTrials = 10;
   while (!objects.empty()) {
@@ -72,4 +75,30 @@ TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
   }
 }
 
+TEST(ProbabilityStoreTest, RemoveObjectTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> numerators {1, 2, 3, 5};
+  const std::size_t kNewDenominator = 10;
+  store.addOrUpdateObjectsNewDenominator(objects, numerators, kNewDenominator);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+
+  // Remove last object "9", with numerator 5.
+  store.removeObject(objects.back());
+  objects.pop_back();
+  numerators.pop_back();
+  const float expected_new_denominator =
+      std::accumulate(numerators.begin(), numerators.end(), 0);
+
+  EXPECT_EQ(expected_new_denominator, store.getDenominator());
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(numerators[object_num] / static_cast<float>(kNewDenominator),
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
 }  // namespace quickstep



[06/11] incubator-quickstep git commit: Created a class for storing probabilities.

Posted by hb...@apache.org.
Created a class for storing probabilities.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1b24694b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1b24694b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1b24694b

Branch: refs/heads/scheduler++
Commit: 1b24694bf99665d7e151d8ded751a258de000276
Parents: 8e973b8
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Tue Jun 21 11:45:46 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt                  |  13 ++
 query_execution/PolicyEnforcer.cpp              |   1 +
 query_execution/ProbabilityStore.hpp            | 223 +++++++++++++++++++
 .../tests/ProbabilityStore_unittest.cpp         |  75 +++++++
 4 files changed, 312 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index fcd4f48..18ae0da 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -36,6 +36,7 @@ add_library(quickstep_queryexecution_ExecutionStats ../empty_src.cpp ExecutionSt
 add_library(quickstep_queryexecution_Foreman Foreman.cpp Foreman.hpp)
 add_library(quickstep_queryexecution_ForemanLite ../empty_src.cpp ForemanLite.hpp)
 add_library(quickstep_queryexecution_PolicyEnforcer PolicyEnforcer.cpp PolicyEnforcer.hpp)
+add_library(quickstep_queryexecution_ProbabilityStore ../empty_src.cpp ProbabilityStore.hpp)
 add_library(quickstep_queryexecution_QueryContext QueryContext.cpp QueryContext.hpp)
 add_library(quickstep_queryexecution_QueryContext_proto
             ${queryexecution_QueryContext_proto_srcs}
@@ -97,6 +98,7 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       glog
                       quickstep_queryexecution_ExecutionStats
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_ProbabilityStore
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
                       quickstep_queryexecution_QueryManager
@@ -106,6 +108,9 @@ target_link_libraries(quickstep_queryexecution_PolicyEnforcer
                       quickstep_relationaloperators_WorkOrder
                       quickstep_utility_Macros
                       tmb)
+target_link_libraries(quickstep_queryexecution_ProbabilityStore
+                      glog
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryexecution_QueryContext
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -252,6 +257,14 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif()
 
+add_executable(ProbabilityStore_unittest
+  "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
+target_link_libraries(ProbabilityStore_unittest
+                      gtest
+                      gtest_main
+                      quickstep_queryexecution_ProbabilityStore)
+add_test(ProbabilityStore_unittest ProbabilityStore_unittest) 
+
 add_executable(QueryManager_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/QueryManager_unittest.cpp")
 target_link_libraries(QueryManager_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/PolicyEnforcer.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcer.cpp b/query_execution/PolicyEnforcer.cpp
index 84aa86a..db7206b 100644
--- a/query_execution/PolicyEnforcer.cpp
+++ b/query_execution/PolicyEnforcer.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryManager.hpp"
 #include "query_execution/WorkerDirectory.hpp"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
new file mode 100644
index 0000000..8343d24
--- /dev/null
+++ b/query_execution/ProbabilityStore.hpp
@@ -0,0 +1,223 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_
+
+#include <algorithm>
+#include <cstddef>
+#include <random>
+#include <unordered_map>
+#include <vector>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/**
+ * @brief A class that stores the probabilities of objects. We use a field
+ *        called "property" to identify each object.
+ **/
+class ProbabilityStore {
+ public:
+  /**
+   * @brief Constructor.
+   **/
+  ProbabilityStore()
+      : mt_(std::random_device()()) {}
+
+  /**
+   * @brief Get the number of objects in the store.
+   **/
+  const std::size_t getNumObjects() const {
+    DCHECK_EQ(individual_probabilities_.size(), cumulative_probabilities_.size());
+    return individual_probabilities_.size();
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probability for a given object.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param property The property of the given object.
+   * @param individual_probability The individual (not cumulative) probability
+   *        of the given object.
+   **/
+  void addProbability(const std::size_t property,
+                      const float individual_probability) {
+    individual_probabilities_[property] = individual_probability;
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Add individual (not cumulative) probabilities for given objects.
+   *
+   * @note This function leaves the cumulative probabilities in a consistent
+   *       state. An alternative lazy implementation should be written if cost
+   *       of calculating cumulative probabilities is high.
+   * @note This function may override previously written probability values.
+   *
+   * @param properties A vector of properties to be added.
+   * @param individual_probabilities The individual (not cumulative)
+   *        probabilities of the given objects.
+   **/
+  void addProbabilities(const std::vector<std::size_t> &properties,
+                        const std::vector<float> &individual_probabilities) {
+    DCHECK_EQ(properties.size(), individual_probabilities.size());
+    for (std::size_t i = 0; i < properties.size(); ++i) {
+      individual_probabilities_[properties[i]] = individual_probabilities[i];
+    }
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Update  the probability of a given object to a new value.
+   *
+   * @param property The property of the object.
+   * @param new_individual_probability The new probability to be set.
+   **/
+  void updateProbability(const std::size_t property,
+                         const float new_individual_probability) {
+    auto it = individual_probabilities_.find(property);
+    DCHECK(it != individual_probabilities_.end());
+    it->second = new_individual_probability;
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Remove an object from the store.
+   *
+   * @param property The property of the object to be removed.
+   **/
+  void removeObject(const std::size_t property) {
+    auto it = individual_probabilities_.find(property);
+    DCHECK(it != individual_probabilities_.end());
+    individual_probabilities_.erase(it);
+    updateCumulativeProbabilities();
+  }
+
+  /**
+   * @brief Get the individual probability (not cumulative) for an object.
+   *
+   * @param property The property of the object.
+   **/
+  const float getIndividualProbability(const std::size_t property) const {
+    const auto it = individual_probabilities_.find(property);
+    DCHECK(it != individual_probabilities_.end());
+    return it->second;
+  }
+
+  /**
+   * @brief Update the cumulative probabilities.
+   *
+   * @note This function should be called upon if there are any updates,
+   *       additions or deletions to the individual probabilities.
+   * @note An efficient implementation should be written if there are large
+   *       number of objects.
+   **/
+  void updateCumulativeProbabilities() {
+    cumulative_probabilities_.clear();
+    if (individual_probabilities_.empty()) {
+      // No need to modify the cumulative probabilities.
+      return;
+    }
+    float cumulative_probability = 0;
+    for (const auto property_probability_pair : individual_probabilities_) {
+      cumulative_probabilities_.emplace_back(property_probability_pair.first,
+                                             cumulative_probability);
+      cumulative_probability += property_probability_pair.second;
+    }
+    // Adjust the last cumulative probability manually to 1.0, so that floating
+    // addition related rounding issues are ignored.
+    cumulative_probabilities_.back().updateProbability(1.0);
+  }
+
+  /**
+   * @brief Return a randomly chosen property.
+   *
+   * @note The random number is uniformly chosen.
+   **/
+  inline const std::size_t pickRandomProperty() {
+    std::uniform_real_distribution<float> dist(0.0, 1.0);
+    const float chosen_probability = dist(mt_);
+    return getPropertyForProbability(chosen_probability);
+  }
+
+ private:
+  class ProbabilityInfo {
+   public:
+    ProbabilityInfo(const std::size_t property, const float probability)
+        : property_(property), probability_(probability) {
+      DCHECK_LE(probability, 1.0);
+    }
+
+    ProbabilityInfo(const ProbabilityInfo &other) = default;
+
+    ProbabilityInfo& operator=(const ProbabilityInfo &other) = default;
+
+    void updateProbability(const float new_probability) {
+      DCHECK_LE(new_probability, 1.0);
+      probability_ = new_probability;
+    }
+
+    std::size_t property_;
+    float probability_;
+  };
+
+  /**
+   * @brief Get a property for a given cumulative probability.
+   *
+   * @param key_cumulative_probability The input cumulative probability.
+   *
+   * @return The object that has a cumulative probability that is greater than
+   *         or equal to the input cumulative probability.
+   **/
+  inline const std::size_t getPropertyForProbability(
+      const float key_cumulative_probability) {
+    DCHECK(!cumulative_probabilities_.empty());
+    // It doesn't matter in which order the objects are arranged in the
+    // cumulative_probabilities_ vector.
+    ProbabilityInfo search_key(0, key_cumulative_probability);
+    const auto it = std::upper_bound(
+        cumulative_probabilities_.begin(),
+        cumulative_probabilities_.end(),
+        search_key,
+        [](const ProbabilityInfo &a, const ProbabilityInfo &b) {
+          return a.probability_ < b.probability_;
+        });
+    DCHECK(it != std::end(cumulative_probabilities_));
+    return it->property_;
+  }
+
+  std::unordered_map<std::size_t, float> individual_probabilities_;
+  std::vector<ProbabilityInfo> cumulative_probabilities_;
+
+  std::mt19937_64 mt_;
+
+  DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_EXECUTION_PROBABILITY_STORE_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1b24694b/query_execution/tests/ProbabilityStore_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/ProbabilityStore_unittest.cpp b/query_execution/tests/ProbabilityStore_unittest.cpp
new file mode 100644
index 0000000..e624557
--- /dev/null
+++ b/query_execution/tests/ProbabilityStore_unittest.cpp
@@ -0,0 +1,75 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <cstddef>
+#include <vector>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/ProbabilityStore.hpp"
+
+namespace quickstep {
+
+TEST(ProbabilityStoreTest, CountTest) {
+  ProbabilityStore store;
+  EXPECT_EQ(0u, store.getNumObjects());
+  const std::size_t kProperty = 0;
+  store.addProbability(kProperty, 0.5);
+  EXPECT_EQ(1u, store.getNumObjects());
+  store.removeObject(kProperty);
+  EXPECT_EQ(0u, store.getNumObjects());
+
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+  store.addProbabilities(objects, probabilities);
+
+  EXPECT_EQ(objects.size(), store.getNumObjects());
+}
+
+TEST(ProbabilityStoreTest, IndividualProbabilityTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+  store.addProbabilities(objects, probabilities);
+
+  for (std::size_t object_num = 0; object_num < objects.size(); ++object_num) {
+    EXPECT_EQ(probabilities[object_num],
+              store.getIndividualProbability(objects[object_num]));
+  }
+}
+
+TEST(ProbabilityStoreTest, PickRandomPropertyTest) {
+  ProbabilityStore store;
+  std::vector<std::size_t> objects {3, 5, 7, 9};
+  std::vector<float> probabilities {0.2, 0.3, 0.4, 0.1};
+  store.addProbabilities(objects, probabilities);
+
+  const std::size_t kNumTrials = 10;
+  while (!objects.empty()) {
+    for (std::size_t trial_num = 0; trial_num < kNumTrials; ++trial_num) {
+      const std::size_t picked_property = store.pickRandomProperty();
+      const auto it = std::find(objects.begin(), objects.end(), picked_property);
+      EXPECT_TRUE(it != objects.end());
+    }
+    const std::size_t property_to_be_removed = objects.back();
+    store.removeObject(property_to_be_removed);
+    objects.pop_back();
+    EXPECT_EQ(objects.size(), store.getNumObjects());
+  }
+}
+
+}  // namespace quickstep


[02/11] incubator-quickstep git commit: Created unit test for Learner

Posted by hb...@apache.org.
Created unit test for Learner

- API changes for probability store.
- Check if there's a probability entry for the query to be removed.
- Bug fix in remove query.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d5c6c9dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d5c6c9dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d5c6c9dd

Branch: refs/heads/scheduler++
Commit: d5c6c9ddbf3b2eece2952b2fe2c98960893e544a
Parents: 75ec7f0
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Thu Jun 23 23:03:33 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:01 2016 -0500

----------------------------------------------------------------------
 query_execution/CMakeLists.txt             |  9 ++++
 query_execution/Learner.cpp                |  6 ++-
 query_execution/Learner.hpp                | 33 ++++++++++-----
 query_execution/ProbabilityStore.hpp       |  5 +++
 query_execution/tests/Learner_unittest.cpp | 55 +++++++++++++++++++++++++
 5 files changed, 96 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index cb0f815..3904185 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -268,6 +268,15 @@ if (ENABLE_DISTRIBUTED)
   add_test(BlockLocator_unittest BlockLocator_unittest)
 endif()
 
+add_executable(Learner_unittest
+  "${CMAKE_CURRENT_SOURCE_DIR}/tests/Learner_unittest.cpp")
+target_link_libraries(Learner_unittest
+                      gtest
+                      gtest_main
+                      quickstep_queryexecution_Learner
+                      quickstep_queryoptimizer_QueryHandle)
+add_test(Learner_unittest Learner_unittest) 
+
 add_executable(ProbabilityStore_unittest
   "${CMAKE_CURRENT_SOURCE_DIR}/tests/ProbabilityStore_unittest.cpp")
 target_link_libraries(ProbabilityStore_unittest

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 72c68f0..5d877b4 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -188,8 +188,10 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
     priority_levels.emplace_back(priority_iter->first);
     numerators.emplace_back(priority_iter->first);
   }
-  probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
-      priority_levels, numerators, sum_priority_levels);
+  if (sum_priority_levels > 0) {
+    probabilities_of_priority_levels_->addOrUpdateObjectsNewDenominator(
+        priority_levels, numerators, sum_priority_levels);
+  }
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index 64120a7..9d51877 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -35,10 +35,10 @@
 
 namespace quickstep {
 
-DEFINE_int32(max_past_entries_learner,
+/*DECLARE_int32(max_past_entries_learner,
               10,
               "The maximum number of past WorkOrder execution statistics"
-              " entries for a query");
+              " entries for a query");*/
 /** \addtogroup QueryExecution
  *  @{
  */
@@ -49,6 +49,7 @@ class Learner {
    * @brief Constructor.
    **/
   Learner() {
+    probabilities_of_priority_levels_.reset(new ProbabilityStore());
   }
 
   void addCompletionFeedback(
@@ -67,7 +68,15 @@ class Learner {
     const std::size_t priority_level = getQueryPriority(query_id);
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
-    current_probabilities_[priority_level]->removeObject(query_id);
+    DCHECK(current_probabilities_.find(priority_level) !=
+           current_probabilities_.end());
+    if (current_probabilities_[priority_level]->hasObject(query_id)) {
+      // We may have cases when a query doesn't produce any feedback message,
+      // therefore we may not have an entry for this query in the
+      // current_probabilities_[priority_level] ProbabilityStore.
+      current_probabilities_[priority_level]->removeObject(query_id);
+    }
+    query_id_to_priority_lookup_.erase(query_id);
     checkAndRemovePriorityLevel(priority_level);
     relearn();
   }
@@ -93,6 +102,10 @@ class Learner {
   // at this point.
   void updateProbabilitiesOfAllPriorityLevels(const std::size_t priority_level);
 
+  inline const std::size_t hasActiveQueries() const {
+    return !query_id_to_priority_lookup_.empty();
+  }
+
  private:
   /**
    * @brief Initialize the default probabilities for the queries.
@@ -111,12 +124,15 @@ class Learner {
    **/
   inline void initializePriorityLevelIfNotPresent(
       const std::size_t priority_level) {
-    if (isPriorityLevelPresent(priority_level)) {
+    if (!isPriorityLevelPresent(priority_level)) {
       current_probabilities_[priority_level].reset(new ProbabilityStore());
       // Calculate the default probability for the priority level here and use
       // it instead of 0.5 here.
       // TODO(harshad) - Correct this.
-      probabilities_of_priority_levels_->addOrUpdateObject(priority_level, 0);
+      /*const float new_denominator =
+          probabilities_of_priority_levels_[priority_level]->getDenominator();
+      probabilities_of_priority_levels_->addOrUpdateObjectNewDenominator(
+          priority_level, priority_level, new_denominator);*/
       execution_stats_[priority_level];
     }
   }
@@ -168,7 +184,8 @@ class Learner {
     execution_stats_[priority_level].emplace_back(
         query_id,
         std::unique_ptr<ExecutionStats>(
-            new ExecutionStats(FLAGS_max_past_entries_learner)));
+            // new ExecutionStats(FLAGS_max_past_entries_learner)));
+            new ExecutionStats(10)));
     // As we are initializing the query, we obviously haven't gotten any
     // feedback message for this query. Hence mark the following field as false.
     has_feedback_from_all_queries_[priority_level] = false;
@@ -251,10 +268,6 @@ class Learner {
     has_feedback_from_all_queries_[priority_level] = true;
   }
 
-  inline const std::size_t hasActiveQueries() const {
-    return !query_id_to_priority_lookup_.empty();
-  }
-
   /**
    * @brief Get the mean work order execution times for all the queries in
    *        a given priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index d31caa6..347df89 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -55,6 +55,11 @@ class ProbabilityStore {
     return common_denominator_;
   }
 
+  inline bool hasObject(const std::size_t property) const {
+    auto it = individual_probabilities_.find(property);
+    return (it != individual_probabilities_.end());
+  }
+
   /**
    * @brief Add individual (not cumulative) probability for a given object.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d5c6c9dd/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
new file mode 100644
index 0000000..cab241a
--- /dev/null
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -0,0 +1,55 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include <memory>
+
+#include "gtest/gtest.h"
+
+#include "query_execution/Learner.hpp"
+#include "query_optimizer/QueryHandle.hpp"
+
+namespace quickstep {
+
+TEST(LearnerTest, AddQueryTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle;
+  handle.reset(new QueryHandle(1, 1));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  learner.addQuery(*handle);
+  EXPECT_TRUE(learner.hasActiveQueries());
+}
+
+TEST(LearnerTest, RemoveQueryTest) {
+  Learner learner;
+  std::unique_ptr<QueryHandle> handle;
+  handle.reset(new QueryHandle(1, 1));
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  learner.addQuery(*handle);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_FALSE(learner.hasActiveQueries());
+
+  handle.reset(new QueryHandle(2, 1));
+  learner.addQuery(*handle);
+  EXPECT_TRUE(learner.hasActiveQueries());
+  learner.removeQuery(handle->query_id());
+  EXPECT_FALSE(learner.hasActiveQueries());
+}
+
+}  // namespace quickstep


[07/11] incubator-quickstep git commit: Fixed getDenominator method. More unit tests.

Posted by hb...@apache.org.
Fixed getDenominator method. More unit tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2a0def65
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2a0def65
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2a0def65

Branch: refs/heads/scheduler++
Commit: 2a0def654a6d4f60363b41eea6d4755c250f977e
Parents: 5514e00
Author: Harshad Deshmukh <hb...@apache.org>
Authored: Sat Jun 25 09:35:10 2016 -0500
Committer: Harshad Deshmukh <hb...@apache.org>
Committed: Sat Jun 25 09:40:02 2016 -0500

----------------------------------------------------------------------
 query_execution/ExecutionStats.hpp         |  15 +-
 query_execution/Learner.cpp                |  46 +++--
 query_execution/Learner.hpp                | 147 ++++++++++++--
 query_execution/ProbabilityStore.hpp       |  15 +-
 query_execution/QueryExecutionTypedefs.hpp |   1 +
 query_execution/tests/Learner_unittest.cpp | 259 +++++++++++++++++++++++-
 6 files changed, 440 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/ExecutionStats.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ExecutionStats.hpp b/query_execution/ExecutionStats.hpp
index 769c7a4..5643749 100644
--- a/query_execution/ExecutionStats.hpp
+++ b/query_execution/ExecutionStats.hpp
@@ -58,17 +58,17 @@ class ExecutionStats {
   }
 
   /**
-   * @brief Check if there are any stats present.
+   * @brief Check if there are stats present for at least one active operator.
    **/
   inline bool hasStats() const {
     for (auto it = active_operators_.begin();
          it != active_operators_.end();
          ++it) {
-      if (!it->second->hasStatsForOperator()) {
-        return false;
+      if (it->second->hasStatsForOperator()) {
+        return true;
       }
     }
-    return true;
+    return false;
   }
 
   /**
@@ -109,14 +109,13 @@ class ExecutionStats {
    * @param operator_index The operator index which the value belongs to.
    **/
   void addEntry(std::size_t value, std::size_t operator_index) {
-    if (hasOperator(operator_index)) {
-      // This is not the first entry for the given operator.
-      active_operators_[operator_index]->addEntry(value);
-    } else {
+    if (!hasOperator(operator_index)) {
+      // This is the first entry for the given operator.
       // Create the OperatorStats object for this operator.
       active_operators_[operator_index] =
           std::unique_ptr<OperatorStats>(new OperatorStats(max_entries_));
     }
+    active_operators_[operator_index]->addEntry(value);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/Learner.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.cpp b/query_execution/Learner.cpp
index 720df33..11c3735 100644
--- a/query_execution/Learner.cpp
+++ b/query_execution/Learner.cpp
@@ -26,7 +26,6 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
@@ -76,10 +75,20 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     DCHECK(current_probabilities_[priority_level] != nullptr);
     // As we want the probability of the lone query in this priority level as
     // 1, we set the numerator same as denominator.
-    const std::size_t numerator =
-        current_probabilities_[priority_level]->getDenominator();
-    current_probabilities_[priority_level]->addOrUpdateObject(query_id,
-                                                              numerator);
+    // TODO(harshad) - Get the mean work order times here too and use that as
+    // the numerator.
+    ExecutionStats *stats = getExecutionStats(query_id);
+    auto query_stats = stats->getCurrentStats();
+    /*const std::size_t numerator =
+        current_probabilities_[priority_level]->getDenominator();*/
+    if (query_stats.second != 0) {
+      const float mean_workorder_time =
+          query_stats.first / static_cast<float>(query_stats.second);
+      if (mean_workorder_time != 0) {
+        current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
+            query_id, 1 / mean_workorder_time, 1 / mean_workorder_time);
+      }
+    }
     return;
   }
   // Else, there are more than one queries for the given priority level.
@@ -92,18 +101,25 @@ void Learner::updateProbabilitiesForQueriesInPriorityLevel(
     // queries.
     DCHECK(mean_workorders_per_query.find(query_id) !=
            mean_workorders_per_query.end());
+    DCHECK_NE(mean_workorders_per_query[query_id], 0);
     current_probabilities_[priority_level]->addOrUpdateObjectNewDenominator(
-        query_id, mean_workorders_per_query[query_id], denominator);
+        query_id,
+        1 / static_cast<float>(mean_workorders_per_query[query_id]),
+        denominator);
+    // LOG(INFO) << "Added stats on query ID: " << query_id << " priority: " << priority_level;
   } else {
     // At least one of the queries has predicted time for next work order as 0.
     // In such a case, we don't update the probabilities and continue to use
     // the older probabilities.
+    // LOG(INFO) << "Denominator is 0 QID: " << query_id << " priority: " << priority_level;
+    return;
   }
 }
 
 void Learner::updateProbabilitiesOfAllPriorityLevels() {
-  if (!hasFeedbackFromAllPriorityLevels() ||
-      has_feedback_from_all_queries_.empty()) {
+  if (!hasFeedbackFromAllPriorityLevels()) {
+      // has_feedback_from_all_queries_.empty()) {
+      // NOTE(harshad) : Not using this cache as it gets confusing.
     // Either we don't have enough feedback messages from all the priority
     // levels OR there are no active queries in the system.
     return;
@@ -111,9 +127,11 @@ void Learner::updateProbabilitiesOfAllPriorityLevels() {
   // Compute the predicted work order execution times for all the level.
   std::unordered_map<std::size_t, std::size_t> predicted_time_for_level;
   std::size_t sum_active_priorities = 0;
-  for (auto priority_iter : has_feedback_from_all_queries_) {
+  for (auto priority_iter = execution_stats_.begin();
+       priority_iter != execution_stats_.end();
+       ++priority_iter) {
     std::size_t total_time_curr_level = 0;
-    const std::size_t curr_priority_level = priority_iter.first;
+    const std::size_t curr_priority_level = priority_iter->first;
     sum_active_priorities += curr_priority_level;
     // For each query, find its predicted work order execution time.
     const std::unordered_map<std::size_t, std::size_t>
@@ -194,6 +212,7 @@ void Learner::initializeDefaultProbabilitiesForPriorityLevels() {
   for (auto priority_iter = execution_stats_.cbegin();
        priority_iter != execution_stats_.cend();
        ++priority_iter) {
+    DCHECK(!priority_iter->second.empty());
     const std::size_t curr_priority_level = priority_iter->first;
     sum_priority_levels += curr_priority_level;
     priority_levels.emplace_back(curr_priority_level);
@@ -217,7 +236,8 @@ void Learner::initializeQuery(const QueryHandle &query_handle) {
           new ExecutionStats(FLAGS_max_past_entries_learner)));
   // As we are initializing the query, we obviously haven't gotten any
   // feedback message for this query. Hence mark the following field as false.
-  has_feedback_from_all_queries_[priority_level] = false;
+  // has_feedback_from_all_queries_[priority_level] = false;
+  // NOTE(harshad) : Not using this cache as it gets confusing.
 }
 
 void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
@@ -226,7 +246,9 @@ void Learner::checkAndRemovePriorityLevel(const std::size_t priority_level) {
     execution_stats_.erase(priority_level);
     current_probabilities_.erase(priority_level);
     probabilities_of_priority_levels_->removeObject(priority_level);
-    has_feedback_from_all_queries_.erase(priority_level);
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // has_feedback_from_all_queries_.erase(priority_level);
+    // LOG(INFO) << "Removed priority level: " << priority_level;
     if (hasActiveQueries()) {
       if (static_cast<int>(priority_level) == highest_priority_level_) {
         // The priority level to be removed is the highest priority level.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/Learner.hpp
----------------------------------------------------------------------
diff --git a/query_execution/Learner.hpp b/query_execution/Learner.hpp
index f99b1c6..2c6fdef 100644
--- a/query_execution/Learner.hpp
+++ b/query_execution/Learner.hpp
@@ -27,11 +27,14 @@
 #include "query_execution/ExecutionStats.hpp"
 #include "query_execution/ProbabilityStore.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
+#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "query_optimizer/QueryHandle.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
+namespace serialization { class NormalWorkOrderCompletionMessage; }
+
 namespace quickstep {
 
 /** \addtogroup QueryExecution
@@ -49,16 +52,26 @@ class Learner {
       const serialization::NormalWorkOrderCompletionMessage
           &workorder_completion_proto);
 
+  /**
+   * @brief Add a query to the Learner.
+   *
+   * @param query_handle The query handle for the new query.
+   **/
   void addQuery(const QueryHandle &query_handle) {
     initializePriorityLevelIfNotPresent(query_handle.query_priority());
     initializeQuery(query_handle);
     relearn();
   }
 
+  /**
+   * @brief Remove a query from the Learner.
+   *
+   * @param query_id The ID of the query to be removed.
+   **/
   void removeQuery(const std::size_t query_id) {
-    // Find the iterator to the query in execution_stats_.
     DCHECK(isQueryPresent(query_id));
     const std::size_t priority_level = getQueryPriority(query_id);
+    // Find the iterator to the query in execution_stats_.
     auto stats_iter_mutable = getExecutionStatsIterMutable(query_id);
     execution_stats_[priority_level].erase(stats_iter_mutable);
     DCHECK(current_probabilities_.find(priority_level) !=
@@ -69,17 +82,25 @@ class Learner {
       // current_probabilities_[priority_level] ProbabilityStore.
       current_probabilities_[priority_level]->removeObject(query_id);
     }
+    // has_feedback_from_all_queries_[priority_level] = false;
     query_id_to_priority_lookup_.erase(query_id);
     checkAndRemovePriorityLevel(priority_level);
     relearn();
   }
 
-  void removeOperator(const std::size_t query_id, const std::size_t operator_id) {
+  /**
+   * @brief Remove the stats of a given operator in a given query.
+   **/
+  void removeOperator(const std::size_t query_id,
+                      const std::size_t operator_id) {
     ExecutionStats *stats = getExecutionStats(query_id);
     DCHECK(stats != nullptr);
     stats->removeOperator(operator_id);
   }
 
+  /**
+   * @brief Reset the probabilities and start learning again.
+   **/
   void relearn() {
     if (hasActiveQueries()) {
       initializeDefaultProbabilitiesForAllQueries();
@@ -87,10 +108,17 @@ class Learner {
     }
   }
 
+  /**
+   * @brief Check if there are any active queries in the Learner.
+   **/
   inline const bool hasActiveQueries() const {
     return !query_id_to_priority_lookup_.empty();
   }
 
+  /**
+   * @brief Get the number of active queries in the Learner for the given
+   *        priority level.
+   **/
   inline const std::size_t getNumActiveQueriesInPriorityLevel(
       const std::size_t priority_level) const {
     const auto it = execution_stats_.find(priority_level);
@@ -101,6 +129,9 @@ class Learner {
     }
   }
 
+  /**
+   * @brief Get the total number of active queries in the Learner.
+   **/
   inline const std::size_t getTotalNumActiveQueries() const {
     return query_id_to_priority_lookup_.size();
   }
@@ -115,6 +146,83 @@ class Learner {
     return highest_priority_level_;
   }
 
+  /**
+   * @brief Randomly pick a priority level.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A priority level. If no queries are present in the learner, return
+   *         kInvalidPriorityLevel.
+   **/
+  inline const int pickRandomPriorityLevel() const {
+    if (hasActiveQueries()) {
+      const int result = static_cast<int>(
+          probabilities_of_priority_levels_->pickRandomProperty());
+      /*LOG(INFO) << "Random priority level: " << result << " has "
+                << current_probabilities_.find(result)->second->getNumObjects()
+                << " queries";*/
+      return result;
+    } else {
+      return kInvalidPriorityLevel;
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from any priority level in the learner.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present in the learner, return
+   *         kInvalidQueryID.
+   **/
+  inline const int pickRandomQuery() const {
+    if (hasActiveQueries()) {
+      const int random_priority_level = pickRandomPriorityLevel();
+      // Note : The valid priority level values are non-zero.
+      DCHECK_GT(random_priority_level, 0);
+      const int result = pickRandomQueryFromPriorityLevel(
+          static_cast<std::size_t>(random_priority_level));
+      // LOG(INFO) << "Picked random query ID: " << result << " from priority level " << random_priority_level;
+      return result;
+    } else {
+      // LOG(INFO) << "No active query right now";
+      return kInvalidQueryID;
+    }
+  }
+
+  /**
+   * @brief Randomly pick a query from a given priority level in the learner.
+   *
+   * @note We use uniform random distribution.
+   *
+   * @return A query ID. If no queries are present for this priority level in
+   *         the learner, return kInvalidQueryID.
+   **/
+  inline const int pickRandomQueryFromPriorityLevel(
+      const std::size_t priority_level) const {
+    DCHECK(isPriorityLevelPresent(priority_level));
+    if (hasActiveQueries()) {
+      if (hasFeedbackFromAllQueriesInPriorityLevel(priority_level)) {
+        DCHECK(current_probabilities_.at(priority_level) != nullptr);
+        const auto it = current_probabilities_.find(priority_level);
+        if (it->second->getNumObjects() > 0) {
+          return static_cast<int>(
+              current_probabilities_.at(priority_level)->pickRandomProperty());
+        }
+        // LOG(INFO) << "No queries in priority level: " << priority_level;
+      } else {
+        DCHECK(default_probabilities_.at(priority_level) != nullptr);
+        const auto it = default_probabilities_.find(priority_level);
+        if (it->second->getNumObjects() > 0) {
+          return static_cast<int>(
+              default_probabilities_.at(priority_level)->pickRandomProperty());
+        }
+        // LOG(INFO) << "No queries in priority level: " << priority_level;
+      }
+    }
+    return kInvalidQueryID;
+  }
+
  private:
   /**
    * @brief Update the probabilities for queries in the given priority level.
@@ -261,6 +369,8 @@ class Learner {
    **/
   inline bool hasFeedbackFromAllQueriesInPriorityLevel(
       const std::size_t priority_level) const {
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // return has_feedback_from_all_queries_.at(priority_level);
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
         &stats_vector = execution_stats_.at(priority_level);
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {
@@ -275,16 +385,19 @@ class Learner {
   inline void updateFeedbackFromQueriesInPriorityLevel(
       const std::size_t priority_level) {
     const std::vector<std::pair<std::size_t, std::unique_ptr<ExecutionStats>>>
-        &stats_vector = execution_stats_.at(priority_level);
+        &stats_vector = execution_stats_[priority_level];
     for (std::size_t i = 0; i < stats_vector.size(); ++i) {
       DCHECK(stats_vector[i].second != nullptr);
       if (!stats_vector[i].second->hasStats()) {
         // At least one query has no statistics so far.
+        // NOTE(harshad) : Not using this cache as it gets confusing.
+        // has_feedback_from_all_queries_[priority_level] = false;
         return;
       }
     }
     // All the queries have at least one execution statistic.
-    has_feedback_from_all_queries_[priority_level] = true;
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    // has_feedback_from_all_queries_[priority_level] = true;
   }
 
   /**
@@ -313,31 +426,36 @@ class Learner {
   }
 
   /**
-   * @param mean_workorder_per_query A vector of pairs in which:
-   *        1st element is mean time per work order
-   *        2nd element is the query ID.
+   * @param mean_workorder_per_query An unordered_map in which:
+   *        1st element is the query ID.
+   *        2nd element is mean time per work order
    *
    * @note If any query has mean work order time as 0, we return 0 as the
    *       denominator.
    *
    * @return The denominator to be used for probability calculations.
    **/
-  inline float calculateDenominator(std::unordered_map<std::size_t, std::size_t>
-                                        &mean_workorder_per_query) const {
+  inline float calculateDenominator(
+      const std::unordered_map<std::size_t, std::size_t>
+          &mean_workorder_per_query) const {
     float denominator = 0;
     for (const auto &element : mean_workorder_per_query) {
       if (element.second != 0) {
         denominator += 1/static_cast<float>(element.second);
-      } else {
-        return 0;
+      /*} else {
+        return 0;*/
       }
     }
     return denominator;
   }
 
   inline bool hasFeedbackFromAllPriorityLevels() const {
-    for (auto feedback : has_feedback_from_all_queries_) {
-      if (!hasFeedbackFromAllQueriesInPriorityLevel(feedback.first)) {
+    // for (auto feedback : has_feedback_from_all_queries_) {
+    // NOTE(harshad) : Not using this cache as it gets confusing.
+    for (auto priority_iter = default_probabilities_.cbegin();
+         priority_iter != default_probabilities_.cend();
+         ++priority_iter) {
+      if (!hasFeedbackFromAllQueriesInPriorityLevel(priority_iter->first)) {
         return false;
       }
     }
@@ -369,9 +487,10 @@ class Learner {
   // ProbabilityStrore for probabilities mapped to the priority levels.
   std::unique_ptr<ProbabilityStore> probabilities_of_priority_levels_;
 
+  // NOTE(harshad) : Not using this cache as it gets confusing.
   // Key = priority level. Value = A boolean that indicates if we have received
   // feedback from all the queries in the given priority level.
-  std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
+  // std::unordered_map<std::size_t, bool> has_feedback_from_all_queries_;
 
   int highest_priority_level_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/ProbabilityStore.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ProbabilityStore.hpp b/query_execution/ProbabilityStore.hpp
index 233dd2e..ed52f75 100644
--- a/query_execution/ProbabilityStore.hpp
+++ b/query_execution/ProbabilityStore.hpp
@@ -41,7 +41,7 @@ class ProbabilityStore {
    * @brief Constructor.
    **/
   ProbabilityStore()
-      : common_denominator_(1.0), mt_(std::random_device()()) {}
+      : common_denominator_(1.0) {}
 
   /**
    * @brief Get the number of objects in the store.
@@ -221,11 +221,16 @@ class ProbabilityStore {
   /**
    * @brief Return a randomly chosen property.
    *
+   * TODO(harshad) - If it is expensive to create the random device
+   * on every invocation of this function, make it a class variable.
+   * In which case, we won't be able to mark the function as const.
+   *
    * @note The random number is uniformly chosen.
    **/
-  inline const std::size_t pickRandomProperty() {
+  inline const std::size_t pickRandomProperty() const {
     std::uniform_real_distribution<float> dist(0.0, 1.0);
-    const float chosen_probability = dist(mt_);
+    std::random_device rd;
+    const float chosen_probability = dist(rd);
     return getPropertyForProbability(chosen_probability);
   }
 
@@ -260,7 +265,7 @@ class ProbabilityStore {
    *         or equal to the input cumulative probability.
    **/
   inline const std::size_t getPropertyForProbability(
-      const float key_cumulative_probability) {
+      const float key_cumulative_probability) const {
     DCHECK(!cumulative_probabilities_.empty());
     // It doesn't matter in which order the objects are arranged in the
     // cumulative_probabilities_ vector.
@@ -296,8 +301,6 @@ class ProbabilityStore {
 
   float common_denominator_;
 
-  std::mt19937_64 mt_;
-
   DISALLOW_COPY_AND_ASSIGN(ProbabilityStore);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/QueryExecutionTypedefs.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index e13f3e0..feaa285 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -44,6 +44,7 @@ typedef tmb::client_id client_id;
 typedef tmb::message_type_id message_type_id;
 
 const int kInvalidPriorityLevel = -1;
+const int kInvalidQueryID = -1;
 
 using ClientIDMap = ThreadIDBasedMap<client_id,
                                      'C',

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a0def65/query_execution/tests/Learner_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/Learner_unittest.cpp b/query_execution/tests/Learner_unittest.cpp
index 107576f..7d67b1b 100644
--- a/query_execution/tests/Learner_unittest.cpp
+++ b/query_execution/tests/Learner_unittest.cpp
@@ -220,15 +220,16 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
 
   // Randomize the orders.
   std::random_device rd;
-  std::mt19937 g(rd());
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
 
   std::shuffle(priorities_insertion_order.begin(),
                priorities_insertion_order.end(),
-               g);
+               g1);
 
   std::shuffle(priorities_removal_order.begin(),
                priorities_removal_order.end(),
-               g);
+               g2);
 
   Learner learner;
   EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
@@ -263,4 +264,256 @@ TEST_F(LearnerTest, HighestPriorityLevelTest) {
   EXPECT_EQ(kInvalidPriorityLevel, learner.getHighestPriorityLevel());
 }
 
+TEST_F(LearnerTest, PickRandomPriorityLevelTest) {
+  std::vector<std::size_t> priorities_insertion_order;
+  std::vector<std::size_t> priorities_removal_order;
+  const std::size_t kNumPrioritiesToTest = 20;
+  for (std::size_t priority_num = 1;
+       priority_num <= kNumPrioritiesToTest;
+       ++priority_num) {
+    // Note: Priority level should be non-zero, hence we begin from 1.
+    priorities_insertion_order.emplace_back(priority_num);
+    priorities_removal_order.emplace_back(priority_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(priorities_insertion_order.begin(),
+               priorities_insertion_order.end(),
+               g1);
+
+  std::shuffle(priorities_removal_order.begin(),
+               priorities_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+
+  std::unique_ptr<QueryHandle> handle;
+  // First insert the queries in the order of priorities as defined by
+  // priorities_insertion_order.
+  for (auto it = priorities_insertion_order.begin();
+       it != priorities_insertion_order.end();
+       ++it) {
+    // Note that the query ID is kept the same as priority level for simplicity.
+    handle.reset(new QueryHandle(*it, *it));
+    learner.addQuery(*handle);
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_insertion_order vector.
+    auto find_priority_level_it = std::find(
+        priorities_insertion_order.begin(), it + 1, picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+  }
+
+  // Repeat the tests a few more times.
+  const std::size_t kNumTests = 200;
+  for (std::size_t test_num = 0; test_num < kNumTests; ++test_num) {
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_insertion_order vector.
+    auto find_priority_level_it = std::find(priorities_insertion_order.begin(),
+                                            priorities_insertion_order.end(),
+                                            picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_insertion_order.end());
+  }
+
+  // Now remove the queries in the order of priorities as defined by
+  // priorities_removal_order.
+  for (auto it = priorities_removal_order.begin();
+       it != priorities_removal_order.end();
+       ++it) {
+    // Recall that the query ID is the same as priority level.
+    const std::size_t picked_priority_level = learner.pickRandomPriorityLevel();
+    // Try to find the randomly picked priority level in the
+    // priorities_removal_order vector.
+    auto find_priority_level_it = std::find(
+        it, priorities_removal_order.end(), picked_priority_level);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_priority_level_it != priorities_removal_order.end());
+    learner.removeQuery(*it);
+  }
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidPriorityLevel, learner.pickRandomPriorityLevel());
+}
+
+TEST_F(LearnerTest, PickRandomQueryDefaultProbabilitiesTest) {
+  // We use a set of unique query IDs. For each query ID, we assign a priority
+  // level. The set of priority levels is smaller than the set of query IDs, so
+  // that we can have more than one queries for a given priority level.
+
+  // Also, in this test we don't send any completion feedback message to the
+  // learner. Therefore it always refers to the default probabilities set for
+  // the queries.
+  std::vector<std::size_t> query_ids_insertion_order;
+  std::vector<std::size_t> query_ids_removal_order;
+  const std::size_t kNumQueriesToTest = 20;
+  for (std::size_t query_num = 0;
+       query_num < kNumQueriesToTest;
+       ++query_num) {
+    query_ids_insertion_order.emplace_back(query_num);
+    query_ids_removal_order.emplace_back(query_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(query_ids_insertion_order.begin(),
+               query_ids_insertion_order.end(),
+               g1);
+
+  std::shuffle(query_ids_removal_order.begin(),
+               query_ids_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+  std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+  std::size_t priority_level_index = 0;
+  std::unique_ptr<QueryHandle> handle;
+  // Insert the queries in the order as defined in query_ids_insertion_order.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+    priority_level_index = (priority_level_index + 1) % priority_levels.size();
+    learner.addQuery(*handle);
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(
+        query_ids_insertion_order.begin(), it + 1, picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Repeat the tests a few more times.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(query_ids_insertion_order.begin(),
+                                   query_ids_insertion_order.end(),
+                                   picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Remove the queries in the order as defined in query_ids_removal_order.
+  for (auto it = query_ids_removal_order.begin();
+       it != query_ids_removal_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_removal_order.
+    auto find_query_it = std::find(
+        it, query_ids_removal_order.end(), picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+    learner.removeQuery(*it);
+  }
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
+
+TEST_F(LearnerTest, PickRandomQueryCurrentProbabilitiesTest) {
+  // We use a set of unique query IDs. For each query ID, we assign a priority
+  // level. The set of priority levels is smaller than the set of query IDs, so
+  // that we can have more than one queries for a given priority level.
+
+  // In this test we send completion feedback messages for all the queries
+  // to the learner. Therefore it refers to the current probabilities set for
+  // the queries.
+  std::vector<std::size_t> query_ids_insertion_order;
+  std::vector<std::size_t> query_ids_removal_order;
+  const std::size_t kNumQueriesToTest = 20;
+  for (std::size_t query_num = 0;
+       query_num < kNumQueriesToTest;
+       ++query_num) {
+    query_ids_insertion_order.emplace_back(query_num);
+    query_ids_removal_order.emplace_back(query_num);
+  }
+
+  // Randomize the orders.
+  std::random_device rd;
+  std::mt19937 g1(rd());
+  std::mt19937 g2(rd());
+
+  std::shuffle(query_ids_insertion_order.begin(),
+               query_ids_insertion_order.end(),
+               g1);
+
+  std::shuffle(query_ids_removal_order.begin(),
+               query_ids_removal_order.end(),
+               g2);
+
+  Learner learner;
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+
+  std::vector<std::size_t> priority_levels {1, 3, 5, 9};
+  std::size_t priority_level_index = 0;
+  std::unique_ptr<QueryHandle> handle;
+  // Insert the queries in the order as defined in query_ids_insertion_order.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    handle.reset(new QueryHandle(*it, priority_levels[priority_level_index]));
+    priority_level_index = (priority_level_index + 1) % priority_levels.size();
+    learner.addQuery(*handle);
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(
+        query_ids_insertion_order.begin(), it + 1, picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Now send one completion feedback message per query to the learner.
+  const std::size_t kOperatorID = 0;
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    // LOG(INFO) << "Completion message for query : " << *it;
+    learner.addCompletionFeedback(createMockCompletionMessage(*it, kOperatorID));
+  }
+
+  // Repeat the tests a few more times.
+  for (auto it = query_ids_insertion_order.begin();
+       it != query_ids_insertion_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_insertion_order.
+    auto find_query_it = std::find(query_ids_insertion_order.begin(),
+                                   query_ids_insertion_order.end(),
+                                   picked_query_id);
+    // We expect the search to be successful.
+    EXPECT_TRUE(find_query_it != query_ids_insertion_order.end());
+  }
+
+  // Remove the queries in the order as defined in query_ids_removal_order.
+  for (auto it = query_ids_removal_order.begin();
+       it != query_ids_removal_order.end();
+       ++it) {
+    const int picked_query_id = learner.pickRandomQuery();
+    // Try to find the randomly picked query ID in query_ids_removal_order.
+    auto find_query_it = std::find(
+        it, query_ids_removal_order.end(), picked_query_id);
+    // We expect the search to be successful.
+    // LOG(INFO) << "Picked query ID: " << picked_query_id << "\n";
+    EXPECT_TRUE(find_query_it != query_ids_removal_order.end());
+    learner.removeQuery(*it);
+    // LOG(INFO) << "Removed query ID: " << *it;
+  }
+
+  EXPECT_FALSE(learner.hasActiveQueries());
+  EXPECT_EQ(kInvalidQueryID, learner.pickRandomQuery());
+}
 }  // namespace quickstep