You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/05/04 03:22:50 UTC

[06/32] incubator-quickstep git commit: Add common-subexpression support.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/rules/CollapseSelection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CollapseSelection.hpp b/query_optimizer/rules/CollapseSelection.hpp
new file mode 100644
index 0000000..bc5e4a3
--- /dev/null
+++ b/query_optimizer/rules/CollapseSelection.hpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_COLLAPSE_SELECTION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_COLLAPSE_SELECTION_HPP_
+
+#include <string>
+
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Merges nested Selections into one single Selection.
+ */
+class CollapseSelection : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  CollapseSelection() {}
+
+  std::string getName() const override {
+    return "CollapseSelection";
+  }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CollapseSelection);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_COLLAPSE_SELECTION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/rules/ExtractCommonSubexpression.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.cpp b/query_optimizer/rules/ExtractCommonSubexpression.cpp
new file mode 100644
index 0000000..e3f996c
--- /dev/null
+++ b/query_optimizer/rules/ExtractCommonSubexpression.cpp
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/ExtractCommonSubexpression.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "utility/HashError.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+ExtractCommonSubexpression::ExtractCommonSubexpression(
+    OptimizerContext *optimizer_context)
+    : optimizer_context_(optimizer_context) {
+  const std::vector<E::ExpressionType> homogeneous_expr_types = {
+      E::ExpressionType::kAlias,
+      E::ExpressionType::kAttributeReference,
+      E::ExpressionType::kBinaryExpression,
+      E::ExpressionType::kCast,
+      E::ExpressionType::kCommonSubexpression,
+      E::ExpressionType::kScalarLiteral,
+      E::ExpressionType::kUnaryExpression
+  };
+
+  for (const auto &expr_type : homogeneous_expr_types) {
+    homogeneous_expression_types_.emplace(expr_type);
+  }
+}
+
+P::PhysicalPtr ExtractCommonSubexpression::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  return applyInternal(input);
+}
+
+P::PhysicalPtr ExtractCommonSubexpression::applyInternal(
+    const P::PhysicalPtr &input) {
+  // First process all child nodes.
+  std::vector<P::PhysicalPtr> new_children;
+  for (const auto &child : input->children()) {
+    new_children.emplace_back(applyInternal(child));
+  }
+
+  const P::PhysicalPtr node =
+      new_children == input->children()
+          ? input
+          : input->copyWithNewChildren(new_children);
+
+  // Process expressions of the current node.
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr aggregate =
+          std::static_pointer_cast<const P::Aggregate>(node);
+
+      std::vector<E::ExpressionPtr> expressions;
+      // Gather grouping expressions and aggregate functions' argument expressions.
+      for (const auto &expr : aggregate->grouping_expressions()) {
+        expressions.emplace_back(expr);
+      }
+      for (const auto &expr : aggregate->aggregate_expressions()) {
+        const E::AggregateFunctionPtr &func =
+            std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+        for (const auto &arg : func->getArguments()) {
+          expressions.emplace_back(arg);
+        }
+      }
+
+      // Transform the expressions so that common subexpressions are labelled.
+      const std::vector<E::ExpressionPtr> new_expressions =
+          transformExpressions(expressions);
+
+      if (new_expressions != expressions) {
+        std::vector<E::AliasPtr> new_aggregate_expressions;
+        std::vector<E::NamedExpressionPtr> new_grouping_expressions;
+
+        // Reconstruct grouping expressions.
+        const std::size_t num_grouping_expressions =
+            aggregate->grouping_expressions().size();
+        for (std::size_t i = 0; i < num_grouping_expressions; ++i) {
+          DCHECK(E::SomeNamedExpression::Matches(new_expressions[i]));
+          new_grouping_expressions.emplace_back(
+              std::static_pointer_cast<const E::NamedExpression>(new_expressions[i]));
+        }
+
+        // Reconstruct aggregate expressions.
+        auto it = new_expressions.begin() + num_grouping_expressions;
+        for (const auto &expr : aggregate->aggregate_expressions()) {
+          const E::AggregateFunctionPtr &func =
+              std::static_pointer_cast<const E::AggregateFunction>(
+                  expr->expression());
+
+          std::vector<E::ScalarPtr> new_arguments;
+          for (std::size_t i = 0; i < func->getArguments().size(); ++i, ++it) {
+            DCHECK(E::SomeScalar::Matches(*it));
+            new_arguments.emplace_back(std::static_pointer_cast<const E::Scalar>(*it));
+          }
+
+          if (new_arguments == func->getArguments()) {
+            new_aggregate_expressions.emplace_back(expr);
+          } else {
+            const E::AggregateFunctionPtr new_func =
+                E::AggregateFunction::Create(func->getAggregate(),
+                                             new_arguments,
+                                             func->is_vector_aggregate(),
+                                             func->is_distinct());
+            new_aggregate_expressions.emplace_back(
+                E::Alias::Create(expr->id(),
+                                 new_func,
+                                 expr->attribute_name(),
+                                 expr->attribute_alias(),
+                                 expr->relation_name()));
+          }
+        }
+        return P::Aggregate::Create(aggregate->input(),
+                                    new_grouping_expressions,
+                                    new_aggregate_expressions,
+                                    aggregate->filter_predicate());
+      }
+      break;
+    }
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr selection =
+          std::static_pointer_cast<const P::Selection>(node);
+
+      // Transform Selection's project expressions.
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(selection->project_expressions())));
+
+      if (new_expressions != selection->project_expressions()) {
+        return P::Selection::Create(selection->input(),
+                                    new_expressions,
+                                    selection->filter_predicate());
+      }
+      break;
+    }
+    case P::PhysicalType::kHashJoin: {
+      const P::HashJoinPtr hash_join =
+          std::static_pointer_cast<const P::HashJoin>(node);
+
+      // Transform HashJoin's project expressions.
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(hash_join->project_expressions())));
+
+      if (new_expressions != hash_join->project_expressions()) {
+        return P::HashJoin::Create(hash_join->left(),
+                                   hash_join->right(),
+                                   hash_join->left_join_attributes(),
+                                   hash_join->right_join_attributes(),
+                                   hash_join->residual_predicate(),
+                                   new_expressions,
+                                   hash_join->join_type());
+      }
+      break;
+    }
+    case P::PhysicalType::kNestedLoopsJoin: {
+      const P::NestedLoopsJoinPtr nested_loops_join =
+          std::static_pointer_cast<const P::NestedLoopsJoin>(node);
+
+      // Transform NestedLoopsJoin's project expressions.
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(nested_loops_join->project_expressions())));
+
+      if (new_expressions != nested_loops_join->project_expressions()) {
+        return P::NestedLoopsJoin::Create(nested_loops_join->left(),
+                                          nested_loops_join->right(),
+                                          nested_loops_join->join_predicate(),
+                                          new_expressions);
+      }
+      break;
+    }
+    default:
+      break;
+  }
+
+  return node;
+}
+
+std::vector<E::ExpressionPtr> ExtractCommonSubexpression::transformExpressions(
+    const std::vector<E::ExpressionPtr> &expressions) {
+  // Step 1. For each subexpression, count the number of its occurrences.
+  ScalarCounter counter;
+  ScalarHashable hashable;
+  for (const auto &expr : expressions) {
+    visitAndCount(expr, &counter, &hashable);
+  }
+
+  // Note that any subexpression with count > 1 is a common subexpression.
+  // However, it is not necessary to create a CommonSubexpression node for every
+  // such subexpression. E.g. consider the case
+  // --
+  //   SELECT (x+1)*(x+2), (x+1)*(x+2)*3 FROM s;
+  // --
+  // We only need to create one *dominant* CommonSubexpression (x+1)*(x+2) and
+  // do not need to create the child (x+1) or (x+2) nodes.
+  //
+  // To address this issue. We define that a subtree S *dominates* its descendent
+  // subtree (or leaf node) T if and only if counter[S] >= counter[T].
+  //
+  // Then we create CommonSubexpression nodes for every subexpression that is
+  // not dominated by any ancestor.
+
+  ScalarMap substitution_map;
+  std::vector<E::ExpressionPtr> new_expressions;
+  for (const auto &expr : expressions) {
+    new_expressions.emplace_back(
+        visitAndTransform(expr, 1, counter, hashable, &substitution_map));
+  }
+  return new_expressions;
+}
+
+E::ExpressionPtr ExtractCommonSubexpression::transformExpression(
+    const E::ExpressionPtr &expression) {
+  return transformExpressions({expression}).front();
+}
+
+bool ExtractCommonSubexpression::visitAndCount(
+    const E::ExpressionPtr &expression,
+    ScalarCounter *counter,
+    ScalarHashable *hashable) {
+  // This bool flag is for avoiding some unnecessary hash() computation.
+  bool children_hashable = true;
+
+  const auto homogeneous_expression_types_it =
+      homogeneous_expression_types_.find(expression->getExpressionType());
+  if (homogeneous_expression_types_it != homogeneous_expression_types_.end()) {
+    for (const auto &child : expression->children()) {
+      children_hashable &= visitAndCount(child, counter, hashable);
+    }
+  }
+
+  E::ScalarPtr scalar;
+  if (children_hashable &&
+      E::SomeScalar::MatchesWithConditionalCast(expression, &scalar)) {
+    // A scalar expression may or may not support the hash() computation.
+    // In the later case, a HashNotSupported exception will be thrown and we
+    // simply ignore this expression (and all its ancestor expressions).
+    try {
+      ++(*counter)[scalar];
+    } catch (const HashNotSupported &e) {
+      return false;
+    }
+    hashable->emplace(scalar);
+    return true;
+  }
+  return false;
+}
+
+E::ExpressionPtr ExtractCommonSubexpression::visitAndTransform(
+    const E::ExpressionPtr &expression,
+    const std::size_t max_reference_count,
+    const ScalarCounter &counter,
+    const ScalarHashable &hashable,
+    ScalarMap *substitution_map) {
+  // TODO(jianqiao): Currently it is hardly beneficial to make AttributeReference
+  // a common subexpression due to the inefficiency of ScalarAttribute's
+  // size-not-known-at-compile-time std::memcpy() calls, compared to copy-elision
+  // at selection level. Even in the case of compressed column store, it requires
+  // an attribute to occur at least 4 times for the common subexpression version
+  // to outperform the direct decoding version. We may look into ScalarAttribute
+  // and modify the heuristic here later.
+  if (expression->getExpressionType() == E::ExpressionType::kScalarLiteral ||
+      expression->getExpressionType() == E::ExpressionType::kAttributeReference) {
+    return expression;
+  }
+
+  E::ScalarPtr scalar;
+  const bool is_hashable =
+      E::SomeScalar::MatchesWithConditionalCast(expression, &scalar)
+          && hashable.find(scalar) != hashable.end();
+
+  std::size_t new_max_reference_count;
+  if (is_hashable) {
+    // CommonSubexpression node already generated somewhere. Just refer to that
+    // and return.
+    const auto substitution_map_it = substitution_map->find(scalar);
+    if (substitution_map_it != substitution_map->end()) {
+      return substitution_map_it->second;
+    }
+
+    // Otherwise, update the dominance count.
+    const auto counter_it = counter.find(scalar);
+    DCHECK(counter_it != counter.end());
+    DCHECK_LE(max_reference_count, counter_it->second);
+    new_max_reference_count = counter_it->second;
+  } else {
+    new_max_reference_count = max_reference_count;
+  }
+
+  // Process children.
+  std::vector<E::ExpressionPtr> new_children;
+  const auto homogeneous_expression_types_it =
+      homogeneous_expression_types_.find(expression->getExpressionType());
+  if (homogeneous_expression_types_it == homogeneous_expression_types_.end()) {
+    // If child subexpressions cannot be hoisted through the current expression,
+    // treat child expressions as isolated sub-optimizations.
+    for (const auto &child : expression->children()) {
+      new_children.emplace_back(transformExpression(child));
+    }
+  } else {
+    for (const auto &child : expression->children()) {
+      new_children.emplace_back(
+          visitAndTransform(child,
+                            new_max_reference_count,
+                            counter,
+                            hashable,
+                            substitution_map));
+    }
+  }
+
+  E::ExpressionPtr output;
+  if (new_children == expression->children()) {
+    output = expression;
+  } else {
+    output = std::static_pointer_cast<const E::Scalar>(
+        expression->copyWithNewChildren(new_children));
+  }
+
+  // Wrap it with a new CommonSubexpression node if the current expression is a
+  // dominant subexpression.
+  if (is_hashable && new_max_reference_count > max_reference_count) {
+    DCHECK(E::SomeScalar::Matches(output));
+    const E::CommonSubexpressionPtr common_subexpression =
+        E::CommonSubexpression::Create(
+            optimizer_context_->nextExprId(),
+            std::static_pointer_cast<const E::Scalar>(output));
+    substitution_map->emplace(scalar, common_subexpression);
+    output = common_subexpression;
+  }
+
+  return output;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/rules/ExtractCommonSubexpression.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.hpp b/query_optimizer/rules/ExtractCommonSubexpression.hpp
new file mode 100644
index 0000000..3cdd70e
--- /dev/null
+++ b/query_optimizer/rules/ExtractCommonSubexpression.hpp
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
+
+#include <cstddef>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to identify and label common
+ *        subexpressions.
+ *
+ * @note This is essentially a logical optimization pass. However, we need some
+ *       of the physical passes (e.g. ReuseAggregateExpressions) to be finalized
+ *       before this one to maximize optimization opportunities.
+ */
+class ExtractCommonSubexpression : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param optimizer_context The optimizer context.
+   */
+  explicit ExtractCommonSubexpression(OptimizerContext *optimizer_context);
+
+  ~ExtractCommonSubexpression() override {}
+
+  std::string getName() const override {
+    return "ExtractCommonSubexpression";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input);
+
+  struct ScalarHash {
+    inline std::size_t operator()(const expressions::ScalarPtr &scalar) const {
+      return scalar->hash();
+    }
+  };
+
+  struct ScalarEqual {
+    inline bool operator()(const expressions::ScalarPtr &lhs,
+                           const expressions::ScalarPtr &rhs) const {
+      return lhs->equals(rhs);
+    }
+  };
+
+  // For memorizing whether a subexpression is hashable.
+  using ScalarHashable = std::unordered_set<expressions::ScalarPtr>;
+
+  // For counting the number of occurrences of each subexpression.
+  using ScalarCounter =
+      std::unordered_map<expressions::ScalarPtr, std::size_t, ScalarHash, ScalarEqual>;
+
+  // For mapping each subexpression to its transformed version.
+  using ScalarMap =
+      std::unordered_map<expressions::ScalarPtr,
+                         expressions::CommonSubexpressionPtr,
+                         ScalarHash,
+                         ScalarEqual>;
+
+  std::vector<expressions::ExpressionPtr> transformExpressions(
+      const std::vector<expressions::ExpressionPtr> &expressions);
+
+  expressions::ExpressionPtr transformExpression(
+      const expressions::ExpressionPtr &expression);
+
+  // Traverse the expression tree and increase the count of each subexpression.
+  bool visitAndCount(
+      const expressions::ExpressionPtr &expression,
+      ScalarCounter *counter,
+      ScalarHashable *hashable);
+
+  // Traverse the expression tree and transform subexpressions (to common
+  // subexpressions) if applicable.
+  expressions::ExpressionPtr visitAndTransform(
+      const expressions::ExpressionPtr &expression,
+      const std::size_t max_reference_count,
+      const ScalarCounter &counter,
+      const ScalarHashable &hashable,
+      ScalarMap *substitution_map);
+
+  template <typename ScalarSubclassT>
+  static std::vector<expressions::ExpressionPtr> UpCast(
+      const std::vector<std::shared_ptr<const ScalarSubclassT>> &expressions) {
+    std::vector<expressions::ExpressionPtr> output;
+    for (const auto &expr : expressions) {
+      output.emplace_back(expr);
+    }
+    return output;
+  }
+
+  template <typename ScalarSubclassT>
+  static std::vector<std::shared_ptr<const ScalarSubclassT>> DownCast(
+      const std::vector<expressions::ExpressionPtr> &expressions) {
+    std::vector<std::shared_ptr<const ScalarSubclassT>> output;
+    for (const auto &expr : expressions) {
+      output.emplace_back(std::static_pointer_cast<const ScalarSubclassT>(expr));
+    }
+    return output;
+  }
+
+  struct ExpressionTypeHash {
+    using UnderT = std::underlying_type<expressions::ExpressionType>::type;
+
+    inline std::size_t operator()(const expressions::ExpressionType &et) const {
+      return std::hash<UnderT>()(static_cast<UnderT>(et));
+    }
+  };
+
+  // Here we define that an expression type is homogeneous if at execution time
+  // the result column vector of that expression has a one-to-one positional
+  // correspondence with all the result column vectors from its child expressions.
+  // E.g. aggregate functions and CASE expressions are not homogeneous.
+  //
+  // Being homogeneous enables common subexpressions to be hoisted through.
+  // For example, consider the case
+  // --
+  //   (x * 2) + F(x * 2)
+  // --
+  // where F is some unary expression. Then if F is homogenous, we can mark the
+  // two (x * 2) as common subexpressions. Otherwise, we cannot do that since
+  // the two subexpressions will generate different result column vectors.
+  std::unordered_set<expressions::ExpressionType,
+                     ExpressionTypeHash> homogeneous_expression_types_;
+
+  OptimizerContext *optimizer_context_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExtractCommonSubexpression);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/rules/ReuseAggregateExpressions.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.cpp b/query_optimizer/rules/ReuseAggregateExpressions.cpp
new file mode 100644
index 0000000..79dede6
--- /dev/null
+++ b/query_optimizer/rules/ReuseAggregateExpressions.cpp
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/ReuseAggregateExpressions.hpp"
+
+#include <cstddef>
+#include <list>
+#include <map>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/BinaryExpression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "utility/HashError.hpp"
+
+#include "gflags/gflags.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+DEFINE_uint64(reuse_aggregate_group_size_threshold, 1000u,
+              "The threshold on estimated number of groups for an aggregation "
+              "below which the ReuseAggregateExpressions optimization will be "
+              "performed.");
+
+DEFINE_double(reuse_aggregate_ratio_threshold, 0.3,
+              "The threshold on the ratio of (# of eliminable columns) to "
+              "(# of original columns) for an aggregation above which the "
+              "ReuseAggregateExpressions optimization will be performed.");
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+void ReuseAggregateExpressions::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  // Initialize cost model.
+  const P::TopLevelPlanPtr top_level_plan =
+     std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
+    const P::PhysicalPtr &input) {
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(input, &aggregate)) {
+    return input;
+  }
+
+  const std::vector<E::AliasPtr> &agg_exprs = aggregate->aggregate_expressions();
+
+  // Maps aggregated expressions to AggregationID + positions.
+  //
+  // For example,
+  // --
+  //   SELECT SUM(x+1), AVG(x+1), SUM(x+1), COUNT(*), SUM(y) FROM s;
+  // --
+  // will generate *agg_expr_info* as
+  // --
+  // {
+  //   x+1: { kSum: [0, 2], kAvg: [1] },
+  //   y: { kSum: [4] },
+  // }
+  // --
+  // and COUNT(*) will be recorded inside *count_star_info*.
+  std::unordered_map<E::ScalarPtr,
+                     std::map<AggregationID, std::vector<std::size_t>>,
+                     ScalarHash, ScalarEqual> agg_expr_info;
+  std::list<std::size_t> count_star_info;
+
+  for (std::size_t i = 0; i < agg_exprs.size(); ++i) {
+    DCHECK(agg_exprs[i]->expression()->getExpressionType()
+               == E::ExpressionType::kAggregateFunction);
+    const E::AggregateFunctionPtr agg_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(
+            agg_exprs[i]->expression());
+
+    // Skip DISTINCT aggregations.
+    if (agg_expr->is_distinct()) {
+      continue;
+    }
+
+    const AggregationID agg_id = agg_expr->getAggregate().getAggregationID();
+    const std::vector<E::ScalarPtr> &arguments = agg_expr->getArguments();
+
+    // Currently we only consider aggregate functions with 0 or 1 argument.
+    if (agg_id == AggregationID::kCount) {
+      if (arguments.empty()) {
+        count_star_info.emplace_front(i);
+        continue;
+      } else if (!arguments.front()->getValueType().isNullable()) {
+        // For COUNT(x) where x is not null, we view it as a more general COUNT(*).
+        count_star_info.emplace_back(i);
+        continue;
+      }
+    }
+    if (arguments.size() == 1) {
+      try {
+        agg_expr_info[arguments.front()][agg_id].emplace_back(i);
+      } catch (const HashNotSupported &e) {
+        continue;
+      }
+    }
+  }
+
+  // Now for each aggregate expression, figure out whether we can avoid the
+  // computation by reusing other aggregate expression's result.
+  std::vector<std::unique_ptr<AggregateReference>> agg_refs(agg_exprs.size());
+
+  constexpr std::size_t kInvalidRef = static_cast<std::size_t>(-1);
+  std::size_t count_star_ref;
+
+  // Check whether COUNT(*) is available.
+  if (count_star_info.empty()) {
+    count_star_ref = kInvalidRef;
+  } else {
+    auto it = count_star_info.begin();
+    count_star_ref = *it;
+
+    for (++it; it != count_star_info.end(); ++it) {
+      agg_refs[*it].reset(new AggregateReference(count_star_ref));
+    }
+  }
+
+  // Iterate through agg_expr_info to find all transformation opportunities,
+  // and record them into agg_refs.
+  for (const auto &it : agg_expr_info) {
+    const auto &ref_map = it.second;
+
+    // First, check whether AVG can be reduced to SUM/COUNT.
+    bool is_avg_processed = false;
+
+    const auto avg_it = ref_map.find(AggregationID::kAvg);
+    if (avg_it != ref_map.end()) {
+      std::size_t count_ref = kInvalidRef;
+
+      // To reduce AVG to SUM/COUNT, we need a COUNT available.
+      // TODO(jianqiao): We may even add a new COUNT(*) aggregation if it is not
+      // there. E.g. when there are at least two AVG(...) aggregate functions.
+      if (it.first->getValueType().isNullable()) {
+        const auto count_it = ref_map.find(AggregationID::kCount);
+        if (count_it != ref_map.end()) {
+          DCHECK(!count_it->second.empty());
+          count_ref = count_it->second.front();
+        }
+      } else {
+        count_ref = count_star_ref;
+      }
+
+      if (count_ref != kInvalidRef) {
+        // It is done if there is an existing SUM. Otherwise we strength-reduce
+        // the current AVG to SUM.
+        const auto sum_it = ref_map.find(AggregationID::kSum);
+        const std::size_t sum_ref =
+            sum_it == ref_map.end() ? kInvalidRef : sum_it->second.front();
+
+        for (const std::size_t idx : avg_it->second) {
+          agg_refs[idx].reset(new AggregateReference(sum_ref, count_ref));
+        }
+        is_avg_processed = true;
+      }
+    }
+
+    // Then, identify duplicate aggregate expressions.
+    for (const auto &ref_it : ref_map) {
+      if (ref_it.first == AggregationID::kAvg && is_avg_processed) {
+        continue;
+      }
+      const auto &indices = ref_it.second;
+      DCHECK(!indices.empty());
+      const std::size_t ref = indices.front();
+      for (std::size_t i = 1; i < indices.size(); ++i) {
+        agg_refs[indices[i]].reset(new AggregateReference(ref));
+      }
+    }
+  }
+
+  // Count the number of eliminable aggregate expressions.
+  std::size_t num_eliminable = 0;
+  for (const auto &agg_ref : agg_refs) {
+    if (agg_ref != nullptr) {
+      ++num_eliminable;
+    }
+  }
+
+  if (num_eliminable == 0) {
+    return input;
+  }
+
+  // Now we need to make a decision since it is not always benefitial to perform
+  // the transformation. Currently we employ a simple heuristic that if either
+  // (1) The estimated number of groups for this Aggregate node is smaller than
+  //     the specified FLAGS_reuse_aggregate_group_size_threshold.
+  // or
+  // (2) The ratio of (# of eliminable columns) to (# of original columns) is
+  //     greater than the specified FLAGS_reuse_aggregate_ratio_threshold.
+  // then we perform the transformation.
+  const bool is_group_size_condition_satisfied =
+      cost_model_->estimateNumGroupsForAggregate(aggregate)
+          < FLAGS_reuse_aggregate_group_size_threshold;
+  const bool is_ratio_condition_satisfied =
+      static_cast<double>(num_eliminable) / agg_exprs.size()
+          > FLAGS_reuse_aggregate_ratio_threshold;
+
+  if (!is_group_size_condition_satisfied && !is_ratio_condition_satisfied) {
+    return input;
+  }
+
+  // Now we transform the original Aggregate to a reduced Aggregate + a wrapping
+  // Selection.
+
+  // Aggregate expressions for the new Aggregate.
+  std::vector<E::AliasPtr> new_agg_exprs;
+
+  // Project expressions for the new Selection.
+  std::vector<E::NamedExpressionPtr> new_select_exprs;
+
+  for (const auto &grouping_expr : aggregate->grouping_expressions()) {
+    new_select_exprs.emplace_back(E::ToRef(grouping_expr));
+  }
+
+  const std::vector<E::AttributeReferencePtr> agg_attrs = E::ToRefVector(agg_exprs);
+
+  for (std::size_t i = 0; i < agg_refs.size(); ++i) {
+    const auto &agg_ref = agg_refs[i];
+    const E::AliasPtr &agg_expr = agg_exprs[i];
+
+    if (agg_ref == nullptr) {
+      // Case 1: this aggregate expression can not be eliminated.
+      new_agg_exprs.emplace_back(agg_expr);
+      new_select_exprs.emplace_back(
+          E::AttributeReference::Create(agg_expr->id(),
+                                        agg_expr->attribute_name(),
+                                        agg_expr->attribute_alias(),
+                                        agg_expr->relation_name(),
+                                        agg_expr->getValueType(),
+                                        E::AttributeReferenceScope::kLocal));
+    } else {
+      switch (agg_ref->kind) {
+        // Case 2.1: this aggregate expression can be eliminated.
+        case AggregateReference::kDirect: {
+          new_select_exprs.emplace_back(
+              E::Alias::Create(agg_expr->id(),
+                               agg_attrs[agg_ref->first_ref],
+                               agg_expr->attribute_name(),
+                               agg_expr->attribute_alias(),
+                               agg_expr->relation_name()));
+          break;
+        }
+        // Case 2.2: this aggregate expression is an AVG.
+        case AggregateReference::kAvg: {
+          E::AttributeReferencePtr sum_attr;
+          if (agg_ref->first_ref == kInvalidRef) {
+            // Case 2.2.1: If there is no existing SUM, we need to convert this
+            //             AVG to SUM.
+            const E::AggregateFunctionPtr avg_expr =
+                std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+            const AggregateFunction &sum_func =
+                AggregateFunctionFactory::Get(AggregationID::kSum);
+            const E::AggregateFunctionPtr sum_expr =
+                E::AggregateFunction::Create(sum_func,
+                                             avg_expr->getArguments(),
+                                             avg_expr->is_vector_aggregate(),
+                                             avg_expr->is_distinct());
+            new_agg_exprs.emplace_back(
+                E::Alias::Create(optimizer_context_->nextExprId(),
+                                 sum_expr,
+                                 agg_expr->attribute_name(),
+                                 agg_expr->attribute_alias(),
+                                 agg_expr->relation_name()));
+
+            sum_attr = E::ToRef(new_agg_exprs.back());
+          } else {
+            // Case 2.2.2: If there is a SUM somewhere, we just eliminate this
+            //             AVG and use the result from that SUM.
+            sum_attr = agg_attrs[agg_ref->first_ref];
+          }
+
+          // Obtain AVG by evaluating SUM/COUNT in Selection.
+          const BinaryOperation &divide_op =
+              BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide);
+          const E::BinaryExpressionPtr avg_expr =
+              E::BinaryExpression::Create(divide_op,
+                                          sum_attr,
+                                          agg_attrs[agg_ref->second_ref]);
+          new_select_exprs.emplace_back(
+              E::Alias::Create(agg_expr->id(),
+                               avg_expr,
+                               agg_expr->attribute_name(),
+                               agg_expr->attribute_alias(),
+                               agg_expr->relation_name()));
+        }
+      }
+    }
+  }
+
+  // Construct the reduced Aggregate.
+  const P::AggregatePtr new_aggregate =
+      P::Aggregate::Create(aggregate->input(),
+                           aggregate->grouping_expressions(),
+                           new_agg_exprs,
+                           aggregate->filter_predicate());
+
+  // Construct the wrapping Selection.
+  return P::Selection::Create(new_aggregate, new_select_exprs, nullptr);
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/rules/ReuseAggregateExpressions.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.hpp b/query_optimizer/rules/ReuseAggregateExpressions.hpp
new file mode 100644
index 0000000..182e9d9
--- /dev/null
+++ b/query_optimizer/rules/ReuseAggregateExpressions.hpp
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to eliminate duplicate aggregate
+ *        expressions and convert AVG to SUM/COUNT if appropriate.
+ *
+ * This rule rewrites Aggregate to Selection + Aggregate to eliminate duplicates
+ * and strength-reduce AVG functions. E.g.
+ * --
+ *   SELECT SUM(x), SUM(y), SUM(x) * 2, AVG(y), COUNT(*)
+ *   FROM s;
+ * --
+ * will be rewritten as (assume y is not null)
+ * --
+ *   SELECT sum_x, sum_y, sum_x * 2, sum_y / cnt, cnt
+ *   FROM (
+ *     SELECT SUM(x) AS sum_x, SUM(y) AS sum_y, COUNT(*) AS cnt
+ *   ) t;
+ * --
+ *
+ * Meanwhile, note that currently it is not free-of-cost to "re-project" the
+ * columns. So it may not worth doing the transformation in some situations.
+ * E.g. it may actually slow down the query to rewrite
+ * --
+ *   SELECT SUM(a), SUM(b), SUM(c), SUM(d), SUM(a)
+ *   FROM s
+ *   GROUP BY x;
+ * --
+ * as
+ * --
+ *   SELECT sum_a, sum_b, sum_c, sum_d, sum_a
+ *   FROM (
+ *     SELECT SUM(a) AS sum_a, SUM(b) AS sum_b, SUM(c) AS sum_c, SUM(d) AS sum_d
+ *     FROM s
+ *     GROUP BY x;
+ *   ) t;
+ * --
+ * if the number of distinct values of attribute x is large -- in this case, we
+ * avoid one duplicate computation of SUM(a), but introduce 5 extra expensive
+ * column copying with the outer Selection.
+ *
+ * To address the above problem, currently we employ a simple heuristic that if
+ * either
+ * (1) The estimated number of groups for this Aggregate node is smaller than
+ *     the specified FLAGS_reuse_aggregate_group_size_threshold.
+ * or
+ * (2) The ratio of (# of eliminable columns) to (# of original columns) is
+ *     greater than the specified FLAGS_reuse_aggregate_ratio_threshold.
+ * then we perform the transformation.
+ */
+class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param optimizer_context The optimizer context.
+   */
+  explicit ReuseAggregateExpressions(OptimizerContext *optimizer_context)
+      : optimizer_context_(optimizer_context) {}
+
+  std::string getName() const override {
+    return "ReuseAggregateExpressions";
+  }
+
+ protected:
+  void init(const physical::PhysicalPtr &input) override;
+
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  struct ScalarHash {
+    inline std::size_t operator()(const expressions::ScalarPtr &scalar) const {
+      return scalar->hash();
+    }
+  };
+
+  struct ScalarEqual {
+    inline bool operator()(const expressions::ScalarPtr &lhs,
+                           const expressions::ScalarPtr &rhs) const {
+      return lhs->equals(rhs);
+    }
+  };
+
+  // This data structure indicates for each aggregate expression whether the
+  // expression can be eliminated by refering to another identical expression,
+  // or can be strength-reduced from AVG to SUM.
+  struct AggregateReference {
+    enum Kind {
+      kDirect = 0,
+      kAvg
+    };
+
+    explicit AggregateReference(const std::size_t ref)
+        : kind(kDirect), first_ref(ref), second_ref(0) {}
+
+    AggregateReference(const std::size_t sum_ref, const std::size_t count_ref)
+        : kind(kAvg), first_ref(sum_ref), second_ref(count_ref) {}
+
+    const Kind kind;
+    const std::size_t first_ref;
+    const std::size_t second_ref;
+  };
+
+  OptimizerContext *optimizer_context_;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(ReuseAggregateExpressions);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/tests/execution_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 40629ee..595d09d 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -15,6 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_test(quickstep_queryoptimizer_tests_executiongenerator_commonsubexpression
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression/")
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -77,6 +82,11 @@ add_test(quickstep_queryoptimizer_tests_executiongenerator_update
          "${CMAKE_CURRENT_BINARY_DIR}/Update/")
 
 if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_commonsubexpression_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpressionDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpressionDistributed/")
   add_test(quickstep_queryoptimizer_tests_executiongenerator_create_distributed
            "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
            "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -141,6 +151,7 @@ endif(ENABLE_DISTRIBUTED)
 
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
@@ -155,6 +166,7 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/TableGenerator)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
 
 if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpressionDistributed)
   file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed)
   file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed)
   file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/tests/execution_generator/CommonSubexpression.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/CommonSubexpression.test b/query_optimizer/tests/execution_generator/CommonSubexpression.test
new file mode 100644
index 0000000..e0b5e2d
--- /dev/null
+++ b/query_optimizer/tests/execution_generator/CommonSubexpression.test
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+SELECT int_col+1, 1+int_col
+FROM test
+ORDER BY int_col
+LIMIT 5;
+--
++-----------+-----------+
+|(int_col+1)|(1+int_col)|
++-----------+-----------+
+|        -22|        -22|
+|        -20|        -20|
+|        -18|        -18|
+|        -16|        -16|
+|        -14|        -14|
++-----------+-----------+
+==
+
+SELECT SUM(int_col+long_col), SUM((long_col+int_col)*2)
+FROM test;
+--
++-----------------------+---------------------------+
+|SUM((int_col+long_col))|SUM(((long_col+int_col)*2))|
++-----------------------+---------------------------+
+|                   4382|                       8764|
++-----------------------+---------------------------+
+==
+
+SELECT SUM(long_col+1), SUM(long_col+1)*2, AVG(long_col+1), AVG(long_col+1)*2, COUNT(*)
+FROM test;
+--
++--------------------+---------------------+--------------------+---------------------+--------------------+
+|SUM((long_col+1))   |(SUM((long_col+1))*2)|AVG((long_col+1))   |(AVG((long_col+1))*2)|COUNT(*)            |
++--------------------+---------------------+--------------------+---------------------+--------------------+
+|                4925|                 9850|                 197|                  394|                  25|
++--------------------+---------------------+--------------------+---------------------+--------------------+
+==

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/tests/physical_generator/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/physical_generator/CMakeLists.txt b/query_optimizer/tests/physical_generator/CMakeLists.txt
index c752cdd..fc3390e 100644
--- a/query_optimizer/tests/physical_generator/CMakeLists.txt
+++ b/query_optimizer/tests/physical_generator/CMakeLists.txt
@@ -15,6 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_test(quickstep_queryoptimizer_tests_physicalgenerator_commonsubexpression
+         "../quickstep_queryoptimizer_tests_OptimizerTextTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression.test")
 add_test(quickstep_queryoptimizer_tests_physicalgenerator_copy
          "../quickstep_queryoptimizer_tests_OptimizerTextTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Copy.test"

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8169306c/query_optimizer/tests/physical_generator/CommonSubexpression.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/physical_generator/CommonSubexpression.test b/query_optimizer/tests/physical_generator/CommonSubexpression.test
new file mode 100644
index 0000000..b23a97d
--- /dev/null
+++ b/query_optimizer/tests/physical_generator/CommonSubexpression.test
@@ -0,0 +1,772 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[default optimized_logical_plan physical_plan]
+
+SELECT int_col+1, 1+int_col
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+|     +-Add
+|       +-Literal[value=1,type=Int]
+|       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+|   | +-CommonSubexpression[common_subexpression_id=8]
+|   |   +-Operand=Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+|     +-CommonSubexpression[common_subexpression_id=8]
+|       +-Operand=Add
+|         +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|         +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+==
+
+# Aggregate
+SELECT SUM(int_col+float_col), MIN((float_col+int_col)*2)
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   |     +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Float NULL]
+| |     +-AggregateFunction[function=MIN]
+| |       +-Multiply
+| |         +-Add
+| |         | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |         | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |         +-Literal[value=2,type=Int]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=SUM((int_col+float_col)),relation=,type=Double NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Double NULL]
+|   +-Alias[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+|     type=Float NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Float NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((int_col+float_col)),relation=,
+  | type=Double NULL]
+  +-AttributeReference[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+    type=Float NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-CommonSubexpression[common_subexpression_id=8]
+| |   |     +-Operand=Add
+| |   |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Float NULL]
+| |     +-AggregateFunction[function=MIN]
+| |       +-Multiply
+| |         +-CommonSubexpression[common_subexpression_id=8]
+| |         | +-Operand=Add
+| |         |   +-AttributeReference[id=0,name=int_col,relation=test,
+| |         |   | type=Int NULL]
+| |         |   +-AttributeReference[id=2,name=float_col,relation=test,
+| |         |     type=Float]
+| |         +-Literal[value=2,type=Int]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=SUM((int_col+float_col)),relation=,type=Double NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Double NULL]
+|   +-Alias[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+|     type=Float NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Float NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((int_col+float_col)),relation=,
+  | type=Double NULL]
+  +-AttributeReference[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+    type=Float NULL]
+==
+
+# HashJoin
+SELECT int_col + j, (int_col + j) * float_col
+FROM test, (SELECT i, i * i AS j FROM generate_series(1, 10) AS g(i)) t
+WHERE int_col = i AND (int_col + i) < float_col;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=HashJoin
+| | | +-left=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-right=Project
+| | | | +-input=TableGenerator[function_name=generate_series,table_alias=g]
+| | | | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | | |   relation=generate_series,type=Int]
+| | | | +-project_list=
+| | | |   +-Alias[id=7,name=i,relation=,type=Int]
+| | | |   | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |   |   relation=generate_series,type=Int]
+| | | |   +-Alias[id=8,name=j,relation=t,type=Int]
+| | | |     +-Multiply
+| | | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |       | relation=generate_series,type=Int]
+| | | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |         relation=generate_series,type=Int]
+| | | +-left_join_attributes=
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-right_join_attributes=
+| | |   +-AttributeReference[id=7,name=i,relation=,type=Int]
+| | +-filter_predicate=Less
+| |   +-Add
+| |   | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   | +-AttributeReference[id=7,name=i,relation=,type=Int]
+| |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_list=
+|   +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|   +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+|     +-Multiply
+|       +-Add
+|       | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=HashJoin
+| +-left=TableGenerator[function_name=generate_series,table_alias=g]
+| | +-AttributeReference[id=6,name=generate_series,alias=g,
+| |   relation=generate_series,type=Int]
+| +-right=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-residual_predicate=Less
+| | +-Add
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |   relation=generate_series,type=Int]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_expressions=
+| | +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+| | | +-CommonSubexpression[common_subexpression_id=11]
+| | |   +-Operand=Add
+| | |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |     +-Multiply
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |       | relation=generate_series,type=Int]
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |         relation=generate_series,type=Int]
+| | +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+| |   +-Multiply
+| |     +-CommonSubexpression[common_subexpression_id=11]
+| |     | +-Operand=Add
+| |     |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |     |   +-Multiply
+| |     |     +-AttributeReference[id=6,name=generate_series,alias=g,
+| |     |     | relation=generate_series,type=Int]
+| |     |     +-AttributeReference[id=6,name=generate_series,alias=g,
+| |     |       relation=generate_series,type=Int]
+| |     +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-left_join_attributes=
+| | +-AttributeReference[id=6,name=generate_series,alias=g,
+| |   relation=generate_series,type=Int]
+| +-right_join_attributes=
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+==
+
+# NestedLoopsJoin
+SELECT int_col + j, (int_col + j) * float_col
+FROM test, (SELECT i, i * i AS j FROM generate_series(1, 10) AS g(i)) t
+WHERE (int_col + i) < float_col;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=NestedLoopsJoin
+| | +-left=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-right=Project
+| | | +-input=TableGenerator[function_name=generate_series,table_alias=g]
+| | | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |   relation=generate_series,type=Int]
+| | | +-project_list=
+| | |   +-Alias[id=7,name=i,relation=,type=Int]
+| | |   | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |   |   relation=generate_series,type=Int]
+| | |   +-Alias[id=8,name=j,relation=t,type=Int]
+| | |     +-Multiply
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |       | relation=generate_series,type=Int]
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |         relation=generate_series,type=Int]
+| | +-join_predicate=Less
+| |   +-Add
+| |   | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   | +-AttributeReference[id=7,name=i,relation=,type=Int]
+| |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_list=
+|   +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|   +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+|     +-Multiply
+|       +-Add
+|       | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=NestedLoopsJoin
+| +-left=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-right=TableGenerator[function_name=generate_series,table_alias=g]
+| | +-AttributeReference[id=6,name=generate_series,alias=g,
+| |   relation=generate_series,type=Int]
+| +-join_predicate=Less
+| | +-Add
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |   relation=generate_series,type=Int]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_expressions=
+|   +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+|   | +-CommonSubexpression[common_subexpression_id=11]
+|   |   +-Operand=Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Multiply
+|   |       +-AttributeReference[id=6,name=generate_series,alias=g,
+|   |       | relation=generate_series,type=Int]
+|   |       +-AttributeReference[id=6,name=generate_series,alias=g,
+|   |         relation=generate_series,type=Int]
+|   +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+|     +-Multiply
+|       +-CommonSubexpression[common_subexpression_id=11]
+|       | +-Operand=Add
+|       |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       |   +-Multiply
+|       |     +-AttributeReference[id=6,name=generate_series,alias=g,
+|       |     | relation=generate_series,type=Int]
+|       |     +-AttributeReference[id=6,name=generate_series,alias=g,
+|       |       relation=generate_series,type=Int]
+|       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+==
+
+# Case expression
+SELECT long_col+1,
+       CASE WHEN int_col = 1 THEN (long_col+1)*(long_col+1)
+            WHEN int_col = 2 THEN (long_col+1)*(long_col+1)
+            ELSE long_col+1 END AS result
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=(long_col+1),relation=,type=Long]
+|   | +-Add
+|   |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   |   +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=result,relation=,type=Long]
+|     +-SearchedCase
+|       +-else_result_expression=Add
+|       | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|       | +-Literal[value=1,type=Long]
+|       +-condition_perdicates=
+|       | +-Equal
+|       | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | | +-Literal[value=1,type=Int]
+|       | +-Equal
+|       |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       |   +-Literal[value=2,type=Int]
+|       +-conditional_result_expressions=
+|         +-Multiply
+|         | +-Add
+|         | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         | | +-Literal[value=1,type=Int]
+|         | +-Add
+|         |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         |   +-Literal[value=1,type=Int]
+|         +-Multiply
+|           +-Add
+|           | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|           | +-Literal[value=1,type=Int]
+|           +-Add
+|             +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|             +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(long_col+1),relation=,type=Long]
+  +-AttributeReference[id=7,name=result,relation=,type=Long]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=(long_col+1),relation=,type=Long]
+|   | +-Add
+|   |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   |   +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=result,relation=,type=Long]
+|     +-SearchedCase
+|       +-else_result_expression=Add
+|       | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|       | +-Literal[value=1,type=Long]
+|       +-condition_perdicates=
+|       | +-Equal
+|       | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | | +-Literal[value=1,type=Int]
+|       | +-Equal
+|       |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       |   +-Literal[value=2,type=Int]
+|       +-conditional_result_expressions=
+|         +-Multiply
+|         | +-CommonSubexpression[common_subexpression_id=8]
+|         | | +-Operand=Add
+|         | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         | |   +-Literal[value=1,type=Int]
+|         | +-CommonSubexpression[common_subexpression_id=8]
+|         |   +-Operand=Add
+|         |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         |     +-Literal[value=1,type=Int]
+|         +-Multiply
+|           +-CommonSubexpression[common_subexpression_id=9]
+|           | +-Operand=Add
+|           |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|           |   +-Literal[value=1,type=Int]
+|           +-CommonSubexpression[common_subexpression_id=9]
+|             +-Operand=Add
+|               +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|               +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(long_col+1),relation=,type=Long]
+  +-AttributeReference[id=7,name=result,relation=,type=Long]
+==
+
+SELECT (int_col+1)*(int_col+2)*(int_col+3), (int_col+1)*(int_col+2), int_col+1
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),relation=,
+|   | type=Int NULL]
+|   | +-Multiply
+|   |   +-Multiply
+|   |   | +-Add
+|   |   | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   | | +-Literal[value=1,type=Int]
+|   |   | +-Add
+|   |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   |   +-Literal[value=2,type=Int]
+|   |   +-Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=3,type=Int]
+|   +-Alias[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,type=Int NULL]
+|   | +-Multiply
+|   |   +-Add
+|   |   | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   | +-Literal[value=1,type=Int]
+|   |   +-Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=2,type=Int]
+|   +-Alias[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+|     +-Add
+|       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),
+  | relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,
+  | type=Int NULL]
+  +-AttributeReference[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),relation=,
+|   | type=Int NULL]
+|   | +-Multiply
+|   |   +-CommonSubexpression[common_subexpression_id=10]
+|   |   | +-Operand=Multiply
+|   |   |   +-CommonSubexpression[common_subexpression_id=9]
+|   |   |   | +-Operand=Add
+|   |   |   |   +-AttributeReference[id=0,name=int_col,relation=test,
+|   |   |   |   | type=Int NULL]
+|   |   |   |   +-Literal[value=1,type=Int]
+|   |   |   +-Add
+|   |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   |     +-Literal[value=2,type=Int]
+|   |   +-Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=3,type=Int]
+|   +-Alias[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,type=Int NULL]
+|   | +-CommonSubexpression[common_subexpression_id=10]
+|   |   +-Operand=Multiply
+|   |     +-CommonSubexpression[common_subexpression_id=9]
+|   |     | +-Operand=Add
+|   |     |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     |   +-Literal[value=1,type=Int]
+|   |     +-Add
+|   |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |       +-Literal[value=2,type=Int]
+|   +-Alias[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+|     +-CommonSubexpression[common_subexpression_id=9]
+|       +-Operand=Add
+|         +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|         +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),
+  | relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,
+  | type=Int NULL]
+  +-AttributeReference[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+==
+
+# Reuse aggregate expressions
+SELECT SUM(long_col+1), AVG(1+long_col), MIN(float_col), MIN(float_col)*2, COUNT(*)
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-Literal[value=1,type=Int]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=AVG]
+| |   |   +-Add
+| |   |     +-Literal[value=1,type=Int]
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Float NULL]
+| |   | +-AggregateFunction[function=MIN]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,type=Float NULL]
+| |   | +-AggregateFunction[function=MIN]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=11,name=,alias=$aggregate4,relation=$aggregate,type=Long]
+| |     +-AggregateFunction[function=COUNT]
+| |       +-[]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=SUM((long_col+1)),relation=,type=Long NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long NULL]
+|   +-Alias[id=7,name=,alias=AVG((1+long_col)),relation=,type=Double NULL]
+|   | +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|   |   type=Double NULL]
+|   +-Alias[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+|   | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+|   |   type=Float NULL]
+|   +-Alias[id=10,name=,alias=(MIN(float_col)*2),relation=,type=Float NULL]
+|   | +-Multiply
+|   |   +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+|   |   | type=Float NULL]
+|   |   +-Literal[value=2,type=Int]
+|   +-Alias[id=11,name=,alias=COUNT(*),relation=,type=Long]
+|     +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+|       type=Long]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((long_col+1)),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=7,name=,alias=AVG((1+long_col)),relation=,
+  | type=Double NULL]
+  +-AttributeReference[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+  +-AttributeReference[id=10,name=,alias=(MIN(float_col)*2),relation=,
+  | type=Float NULL]
+  +-AttributeReference[id=11,name=,alias=COUNT(*),relation=,type=Long]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-Literal[value=1,type=Int]
+| |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Float NULL]
+| |   | +-AggregateFunction[function=MIN]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=11,name=,alias=$aggregate4,relation=$aggregate,type=Long]
+| |     +-AggregateFunction[function=COUNT]
+| |       +-[]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=SUM((long_col+1)),relation=,type=Long NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long NULL]
+|   +-Alias[id=7,name=,alias=AVG((1+long_col)),relation=,type=Long NULL]
+|   | +-Divide
+|   |   +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   | type=Long NULL]
+|   |   +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+|   |     type=Long]
+|   +-Alias[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+|   | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+|   |   type=Float NULL]
+|   +-Alias[id=10,name=,alias=(MIN(float_col)*2),relation=,type=Float NULL]
+|   | +-Multiply
+|   |   +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+|   |   | type=Float NULL]
+|   |   +-Literal[value=2,type=Int]
+|   +-Alias[id=11,name=,alias=COUNT(*),relation=,type=Long]
+|     +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+|       type=Long]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((long_col+1)),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=7,name=,alias=AVG((1+long_col)),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+  +-AttributeReference[id=10,name=,alias=(MIN(float_col)*2),relation=,
+  | type=Float NULL]
+  +-AttributeReference[id=11,name=,alias=COUNT(*),relation=,type=Long]
+==
+
+SELECT SUM(long_col+1)+2, (SUM(long_col+1)+2)*3
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-Literal[value=1,type=Int]
+| |   +-Alias[id=8,name=,alias=$aggregate1,relation=$aggregate,type=Long NULL]
+| |     +-AggregateFunction[function=SUM]
+| |       +-Add
+| |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |         +-Literal[value=1,type=Int]
+| +-project_list=
+|   +-Alias[id=7,name=,alias=(SUM((long_col+1))+2),relation=,type=Long NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   | type=Long NULL]
+|   |   +-Literal[value=2,type=Int]
+|   +-Alias[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,type=Long NULL]
+|     +-Multiply
+|       +-Add
+|       | +-AttributeReference[id=8,name=,alias=$aggregate1,relation=$aggregate,
+|       | | type=Long NULL]
+|       | +-Literal[value=2,type=Int]
+|       +-Literal[value=3,type=Int]
++-output_attributes=
+  +-AttributeReference[id=7,name=,alias=(SUM((long_col+1))+2),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,
+    type=Long NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |     +-AggregateFunction[function=SUM]
+| |       +-Add
+| |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |         +-Literal[value=1,type=Int]
+| +-project_expressions=
+|   +-Alias[id=7,name=,alias=(SUM((long_col+1))+2),relation=,type=Long NULL]
+|   | +-CommonSubexpression[common_subexpression_id=10]
+|   |   +-Operand=Add
+|   |     +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |     | type=Long NULL]
+|   |     +-Literal[value=2,type=Int]
+|   +-Alias[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,type=Long NULL]
+|     +-Multiply
+|       +-CommonSubexpression[common_subexpression_id=10]
+|       | +-Operand=Add
+|       |   +-AttributeReference[id=6,name=,alias=$aggregate0,
+|       |   | relation=$aggregate,type=Long NULL]
+|       |   +-Literal[value=2,type=Int]
+|       +-Literal[value=3,type=Int]
++-output_attributes=
+  +-AttributeReference[id=7,name=,alias=(SUM((long_col+1))+2),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,
+    type=Long NULL]
+==