You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/17 18:48:45 UTC

incubator-quickstep git commit: Added Partition Rule for HashJoin.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 1c749c911 -> 4b8963493


Added Partition Rule for HashJoin.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4b896349
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4b896349
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4b896349

Branch: refs/heads/master
Commit: 4b8963493b8ebcce38ea1d1cc0a6392dbbae26e2
Parents: 1c749c9
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jun 13 21:50:41 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Fri Jun 16 17:17:06 2017 -0500

----------------------------------------------------------------------
 cli/distributed/QuickstepDistributedCli.cpp     |   5 +
 query_optimizer/CMakeLists.txt                  |   1 +
 query_optimizer/ExecutionGenerator.cpp          |   1 -
 query_optimizer/PhysicalGenerator.cpp           |  11 +
 query_optimizer/physical/FilterJoin.cpp         |   4 +-
 query_optimizer/physical/FilterJoin.hpp         |  26 +-
 query_optimizer/physical/Physical.hpp           |  20 +-
 query_optimizer/rules/CMakeLists.txt            |  25 ++
 query_optimizer/rules/InjectJoinFilters.cpp     |   9 +-
 query_optimizer/rules/Partition.cpp             | 297 +++++++++++++++++
 query_optimizer/rules/Partition.hpp             | 106 ++++++
 query_optimizer/rules/tests/CMakeLists.txt      |   4 +
 .../rules/tests/Partition_unittest.cpp          | 326 +++++++++++++++++++
 .../tests/execution_generator/Partition.test    |  85 ++++-
 14 files changed, 891 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
index 513bedd..e8fa6a1 100644
--- a/cli/distributed/QuickstepDistributedCli.cpp
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -69,6 +69,11 @@ using quickstep::FLAGS_role;
 int main(int argc, char *argv[]) {
   google::InitGoogleLogging(argv[0]);
 
+  // TODO(quickstep-team): Fix JIRA QUICKSTEP-94 for adding LIP filter support
+  // with partitioned inputs in the distributed version.
+  quickstep::optimizer::FLAGS_use_lip_filters = false;
+  quickstep::optimizer::FLAGS_use_filter_joins = false;
+
   gflags::ParseCommandLineFlags(&argc, &argv, true);
   grpc_init();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 04af02c..564c5c8 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -220,6 +220,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_rules_ExtractCommonSubexpression
                       quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_InjectJoinFilters
+                      quickstep_queryoptimizer_rules_Partition
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
                       quickstep_queryoptimizer_rules_ReduceGroupByAttributes

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index a7c7328..7963d97 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -850,7 +850,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       probe_partition_scheme_header ? probe_partition_scheme_header->num_partitions : 1u;
   hash_table_context_proto->set_num_partitions(probe_num_partitions);
 
-
   S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
 
   // SimplifyHashTableImplTypeProto() switches the hash table implementation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 5f3b1b7..6932b30 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -31,6 +31,7 @@
 #include "query_optimizer/rules/ExtractCommonSubexpression.hpp"
 #include "query_optimizer/rules/FuseAggregateJoin.hpp"
 #include "query_optimizer/rules/InjectJoinFilters.hpp"
+#include "query_optimizer/rules/Partition.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
 #include "query_optimizer/rules/ReduceGroupByAttributes.hpp"
@@ -62,6 +63,10 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is suitable "
             "for queries on star-schema tables.");
 
+DEFINE_bool(use_partition_rule, true,
+            "If true, apply an optimization to support partitioned inputs. The "
+            "optimization may add additional Selection for repartitioning.");
+
 DEFINE_bool(use_filter_joins, true,
             "If true, apply an optimization that strength-reduces HashJoins to "
             "FilterJoins (implemented as LIPFilters attached to some anchoring "
@@ -164,6 +169,12 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   // common subexpression evaluated only once.
   rules.emplace_back(new ExtractCommonSubexpression(optimizer_context_));
 
+  // This optimization pass may add additional Selection for repartitions, and
+  // set output PartitionSchemeHeader in a Physical Plan node, when needed.
+  if (FLAGS_use_partition_rule) {
+    rules.push_back(std::make_unique<Partition>(optimizer_context_));
+  }
+
   // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters)
   // requires extra handling of LIPFilterConfiguration for transformed nodes.
   // So currently it is suggested that all the new rules be placed before this

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/physical/FilterJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/FilterJoin.cpp b/query_optimizer/physical/FilterJoin.cpp
index 1817a1c..ca46791 100644
--- a/query_optimizer/physical/FilterJoin.cpp
+++ b/query_optimizer/physical/FilterJoin.cpp
@@ -26,6 +26,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/physical/Physical.hpp"
 #include "utility/Cast.hpp"
 
 namespace quickstep {
@@ -76,7 +77,8 @@ bool FilterJoin::maybeCopyWithPrunedExpressions(
                      build_attributes_,
                      new_project_expressions,
                      build_side_filter_predicate_,
-                     is_anti_join_);
+                     is_anti_join_,
+                     cloneOutputPartitionSchemeHeader());
     return true;
   }
   return false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/physical/FilterJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/FilterJoin.hpp b/query_optimizer/physical/FilterJoin.hpp
index ad4e18b..7b55942 100644
--- a/query_optimizer/physical/FilterJoin.hpp
+++ b/query_optimizer/physical/FilterJoin.hpp
@@ -47,6 +47,8 @@ namespace physical {
 class FilterJoin;
 typedef std::shared_ptr<const FilterJoin> FilterJoinPtr;
 
+struct PartitionSchemeHeader;
+
 /**
  * @brief Physical filter join node. Semantically, FilterJoin is similar to
  *        HashJoin where the difference is that FilterJoin builds a bit vector
@@ -106,7 +108,8 @@ class FilterJoin : public BinaryJoin {
                   build_attributes_,
                   project_expressions(),
                   build_side_filter_predicate_,
-                  is_anti_join_);
+                  is_anti_join_,
+                  cloneOutputPartitionSchemeHeader());
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -125,6 +128,8 @@ class FilterJoin : public BinaryJoin {
    * @param build_side_filter_predicate Optional filtering predicate to be
    *        applied to the build side child BEFORE join.
    * @param is_anti_join Whether this is an anti-join.
+   * @param partition_scheme_header The optional output partition scheme header.
+   *
    * @return An immutable physical FilterJoin.
    */
   static FilterJoinPtr Create(
@@ -134,7 +139,8 @@ class FilterJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &build_attributes,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
       const expressions::PredicatePtr &build_side_filter_predicate,
-      const bool is_anti_join) {
+      const bool is_anti_join,
+      PartitionSchemeHeader *partition_scheme_header = nullptr) {
     return FilterJoinPtr(
         new FilterJoin(probe_child,
                        build_child,
@@ -142,7 +148,8 @@ class FilterJoin : public BinaryJoin {
                        build_attributes,
                        project_expressions,
                        build_side_filter_predicate,
-                       is_anti_join));
+                       is_anti_join,
+                       partition_scheme_header));
   }
 
  protected:
@@ -162,18 +169,19 @@ class FilterJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &build_attributes,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
       const expressions::PredicatePtr &build_side_filter_predicate,
-      const bool is_anti_join)
-      : BinaryJoin(probe_child, build_child, project_expressions),
+      const bool is_anti_join,
+      PartitionSchemeHeader *partition_scheme_header)
+      : BinaryJoin(probe_child, build_child, project_expressions, partition_scheme_header),
         probe_attributes_(probe_attributes),
         build_attributes_(build_attributes),
         build_side_filter_predicate_(build_side_filter_predicate),
         is_anti_join_(is_anti_join) {
   }
 
-  std::vector<expressions::AttributeReferencePtr> probe_attributes_;
-  std::vector<expressions::AttributeReferencePtr> build_attributes_;
-  expressions::PredicatePtr build_side_filter_predicate_;
-  bool is_anti_join_;
+  const std::vector<expressions::AttributeReferencePtr> probe_attributes_;
+  const std::vector<expressions::AttributeReferencePtr> build_attributes_;
+  const expressions::PredicatePtr build_side_filter_predicate_;
+  const bool is_anti_join_;
 
   DISALLOW_COPY_AND_ASSIGN(FilterJoin);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/physical/Physical.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp
index 2279a84..a3a56d0 100644
--- a/query_optimizer/physical/Physical.hpp
+++ b/query_optimizer/physical/Physical.hpp
@@ -114,16 +114,6 @@ class Physical : public OptimizerTree<Physical> {
     return partition_scheme_header_.get();
   }
 
- protected:
-  /**
-   * @brief Constructor.
-   *
-   * @param partition_scheme_header The partition scheme header of the relation.
-   *        The constructor takes ownership of 'partition_scheme_header'.
-   */
-  explicit Physical(PartitionSchemeHeader *partition_scheme_header = nullptr)
-      : partition_scheme_header_(partition_scheme_header) {}
-
   /**
    * @brief Clone a copy of the partition scheme header.
    *
@@ -138,6 +128,16 @@ class Physical : public OptimizerTree<Physical> {
     return nullptr;
   }
 
+ protected:
+  /**
+   * @brief Constructor.
+   *
+   * @param partition_scheme_header The partition scheme header of the relation.
+   *        The constructor takes ownership of 'partition_scheme_header'.
+   */
+  explicit Physical(PartitionSchemeHeader *partition_scheme_header = nullptr)
+      : partition_scheme_header_(partition_scheme_header) {}
+
   std::unique_ptr<PartitionSchemeHeader> partition_scheme_header_;
 
  private:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 1ad8f40..f8df32b 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -28,6 +28,7 @@ add_library(quickstep_queryoptimizer_rules_ExtractCommonSubexpression
 add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
+add_library(quickstep_queryoptimizer_rules_Partition Partition.cpp Partition.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
@@ -155,6 +156,29 @@ target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_utility_Macros
                       quickstep_utility_SqlError
                       quickstep_utility_VectorUtil)
+target_link_libraries(quickstep_queryoptimizer_rules_Partition
+                      glog
+                      gtest
+                      quickstep_queryoptimizer_OptimizerContext
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TableReference
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_utility_Cast
+                      quickstep_utility_EqualsAnyConstant
+                      quickstep_utility_Macros
+                      ${GFLAGS_LIB_NAME})
 target_link_libraries(quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExpressionUtil
@@ -379,6 +403,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_InjectJoinFilters
+                      quickstep_queryoptimizer_rules_Partition
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownFilter
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/rules/InjectJoinFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.cpp b/query_optimizer/rules/InjectJoinFilters.cpp
index 0fcd06b..0aa2f5b 100644
--- a/query_optimizer/rules/InjectJoinFilters.cpp
+++ b/query_optimizer/rules/InjectJoinFilters.cpp
@@ -178,7 +178,8 @@ P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters(
                                  hash_join->right_join_attributes(),
                                  hash_join->project_expressions(),
                                  build_side_filter_predicate,
-                                 is_anti_join);
+                                 is_anti_join,
+                                 hash_join->cloneOutputPartitionSchemeHeader());
   }
 
   if (has_changed_children) {
@@ -250,7 +251,8 @@ physical::PhysicalPtr InjectJoinFilters::pushDownFiltersInternal(
                                  filter_join->build_attributes(),
                                  E::ToNamedExpressions(probe_child->getOutputAttributes()),
                                  filter_join->build_side_filter_predicate(),
-                                 filter_join->is_anti_join());
+                                 filter_join->is_anti_join(),
+                                 filter_join->cloneOutputPartitionSchemeHeader());
   } else {
     return filter_join;
   }
@@ -325,7 +327,8 @@ physical::PhysicalPtr InjectJoinFilters::addFilterAnchors(
         std::static_pointer_cast<const P::FilterJoin>(output_with_new_children);
     return P::Selection::Create(filter_join,
                                 filter_join->project_expressions(),
-                                nullptr);
+                                nullptr,
+                                filter_join->cloneOutputPartitionSchemeHeader());
   } else {
     return output_with_new_children;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/rules/Partition.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/Partition.cpp b/query_optimizer/rules/Partition.cpp
new file mode 100644
index 0000000..074c4e7
--- /dev/null
+++ b/query_optimizer/rules/Partition.cpp
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/Partition.hpp"
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TableReference.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "utility/Cast.hpp"
+#include "utility/EqualsAnyConstant.hpp"
+
+#include "gflags/gflags.h"
+#include "glog/logging.h"
+
+using std::make_unique;
+using std::move;
+using std::size_t;
+using std::static_pointer_cast;
+using std::unordered_set;
+using std::vector;
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = expressions;
+namespace P = physical;
+
+static bool ValidateNumRepartitions(const char *flagname, std::uint64_t value) {
+  return value > 1u;
+}
+DEFINE_uint64(num_repartitions, 4, "Number of repartitions for a join.");
+static const volatile bool num_repartitions_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_num_repartitions, &ValidateNumRepartitions);
+
+void Partition::init(const P::PhysicalPtr &input) {
+  P::TopLevelPlanPtr top_level_plan;
+  CHECK(P::SomeTopLevelPlan::MatchesWithConditionalCast(input, &top_level_plan));
+
+  cost_model_ = make_unique<cost::StarSchemaSimpleCostModel>(top_level_plan->shared_subplans());
+}
+
+namespace {
+
+bool needsSelection(const P::PhysicalType physical_type) {
+  return QUICKSTEP_EQUALS_ANY_CONSTANT(physical_type,
+                                       P::PhysicalType::kSharedSubplanReference,
+                                       P::PhysicalType::kSort,
+                                       P::PhysicalType::kTableReference,
+                                       P::PhysicalType::kUnionAll);
+}
+
+P::PhysicalPtr Repartition(const P::PhysicalPtr &node, const vector<E::AttributeReferencePtr> &join_attributes,
+                           const size_t num_partitions) {
+  P::PartitionSchemeHeader::PartitionExprIds repartition_expr_ids;
+  for (const E::AttributeReferencePtr &attr : join_attributes) {
+    repartition_expr_ids.push_back({ attr->id() });
+  }
+  auto repartition_scheme_header = make_unique<P::PartitionSchemeHeader>(
+      P::PartitionSchemeHeader::PartitionType::kHash, num_partitions, move(repartition_expr_ids));
+
+  if (needsSelection(node->getPhysicalType())) {
+    // Add a Selection node.
+    return P::Selection::Create(node,
+                                CastSharedPtrVector<E::NamedExpression>(node->getOutputAttributes()),
+                                nullptr /* filter_predicate */, repartition_scheme_header.release());
+  } else {
+    // Overwrite the output partition scheme header of the node.
+    return node->copyWithNewOutputPartitionSchemeHeader(repartition_scheme_header.release());
+  }
+}
+
+}  // namespace
+
+P::PhysicalPtr Partition::applyToNode(const P::PhysicalPtr &node) {
+  // Will be used for aggregations.
+  (void) optimizer_context_;
+
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kHashJoin: {
+      const P::HashJoinPtr hash_join = static_pointer_cast<const P::HashJoin>(node);
+
+      P::PhysicalPtr left = hash_join->left();
+      const P::PartitionSchemeHeader *left_partition_scheme_header =
+          left->getOutputPartitionSchemeHeader();
+
+      P::PhysicalPtr right = hash_join->right();
+      const P::PartitionSchemeHeader *right_partition_scheme_header =
+          right->getOutputPartitionSchemeHeader();
+
+      if (!left_partition_scheme_header && !right_partition_scheme_header) {
+        break;
+      }
+
+      const auto &left_join_attributes = hash_join->left_join_attributes();
+      const auto &right_join_attributes = hash_join->right_join_attributes();
+
+      bool left_needs_repartition = false;
+      bool right_needs_repartition = false;
+      size_t num_partitions = 1u;
+
+      needsRepartitionForHashJoin(left_partition_scheme_header, left_join_attributes,
+                                  right_partition_scheme_header, right_join_attributes,
+                                  &left_needs_repartition, &right_needs_repartition, &num_partitions);
+      // Repartition.
+      if (left_needs_repartition) {
+        left = Repartition(left, left_join_attributes, num_partitions);
+      }
+
+      if (right_needs_repartition) {
+        right = Repartition(right, right_join_attributes, num_partitions);
+      }
+
+      unordered_set<E::ExprId> project_expr_ids;
+      for (const E::AttributeReferencePtr &project_expression : hash_join->getOutputAttributes()) {
+        project_expr_ids.insert(project_expression->id());
+      }
+
+      P::PartitionSchemeHeader::PartitionExprIds output_repartition_expr_ids;
+      for (size_t i = 0; i < left_join_attributes.size(); ++i) {
+        const E::ExprId left_join_id = left_join_attributes[i]->id();
+        const E::ExprId right_join_id = right_join_attributes[i]->id();
+
+        output_repartition_expr_ids.emplace_back();
+
+        if (project_expr_ids.count(left_join_id)) {
+          output_repartition_expr_ids.back().insert(left_join_id);
+        }
+
+        if (project_expr_ids.count(right_join_id)) {
+          output_repartition_expr_ids.back().insert(right_join_id);
+        }
+
+        if (output_repartition_expr_ids.back().empty()) {
+          // Some partition attribute will be projected out, so we use
+          // the input partition id as the output partition id.
+          output_repartition_expr_ids.clear();
+          break;
+        }
+      }
+      auto output_partition_scheme_header = make_unique<P::PartitionSchemeHeader>(
+          P::PartitionSchemeHeader::PartitionType::kHash, num_partitions, move(output_repartition_expr_ids));
+      if (left_needs_repartition || right_needs_repartition) {
+        return P::HashJoin::Create(left, right, left_join_attributes, right_join_attributes,
+                                   hash_join->residual_predicate(),
+                                   hash_join->project_expressions(),
+                                   hash_join->join_type(),
+                                   output_partition_scheme_header.release());
+      } else if (left_partition_scheme_header) {
+        return hash_join->copyWithNewOutputPartitionSchemeHeader(output_partition_scheme_header.release());
+      }
+      break;
+    }
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr selection = static_pointer_cast<const P::Selection>(node);
+
+      const P::PartitionSchemeHeader *input_partition_scheme_header =
+          selection->input()->getOutputPartitionSchemeHeader();
+      if (input_partition_scheme_header && input_partition_scheme_header->isHashPartition()) {
+        unordered_set<E::ExprId> project_expr_ids;
+        for (const E::AttributeReferencePtr &project_expression : selection->getOutputAttributes()) {
+          project_expr_ids.insert(project_expression->id());
+        }
+
+        const auto &input_partition_expr_ids = input_partition_scheme_header->partition_expr_ids;
+        P::PartitionSchemeHeader::PartitionExprIds output_partition_expr_ids;
+        for (const auto &equivalent_expr_ids : input_partition_expr_ids) {
+          P::PartitionSchemeHeader::EquivalentPartitionExprIds output_equivalent_partition_expr_ids;
+          for (const E::ExprId expr_id : equivalent_expr_ids) {
+            if (project_expr_ids.find(expr_id) != project_expr_ids.end()) {
+              output_equivalent_partition_expr_ids.insert(expr_id);
+            }
+          }
+
+          if (!output_equivalent_partition_expr_ids.empty()) {
+            output_partition_expr_ids.push_back(move(output_equivalent_partition_expr_ids));
+          }
+        }
+
+        if (input_partition_expr_ids != output_partition_expr_ids) {
+          auto output_partition_scheme_header = make_unique<P::PartitionSchemeHeader>(
+              P::PartitionSchemeHeader::PartitionType::kHash,
+              input_partition_scheme_header->num_partitions,
+              move(output_partition_expr_ids));
+          return selection->copyWithNewOutputPartitionSchemeHeader(output_partition_scheme_header.release());
+        }
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  return node;
+}
+
+void Partition::needsRepartitionForHashJoin(
+    const P::PartitionSchemeHeader *left_partition_scheme_header,
+    const vector<E::AttributeReferencePtr> &left_join_attributes,
+    const P::PartitionSchemeHeader *right_partition_scheme_header,
+    const vector<E::AttributeReferencePtr> &right_join_attributes,
+    bool *left_needs_repartition,
+    bool *right_needs_repartition,
+    size_t *num_partitions) {
+  DCHECK(left_partition_scheme_header || right_partition_scheme_header);
+
+  *left_needs_repartition = false;
+  *num_partitions = 1u;
+
+  if (left_partition_scheme_header) {
+    *num_partitions = left_partition_scheme_header->num_partitions;
+
+    // Need to repartition unless the partition attributes are as the same as
+    // the join attributes.
+    *left_needs_repartition = true;
+    if (left_partition_scheme_header->isHashPartition()) {
+      unordered_set<E::ExprId> left_join_expr_ids;
+      for (const E::AttributeReferencePtr &attr : left_join_attributes) {
+        left_join_expr_ids.insert(attr->id());
+      }
+
+      if (left_partition_scheme_header->reusablePartitionScheme(left_join_expr_ids)) {
+        *left_needs_repartition = false;
+        *num_partitions = left_partition_scheme_header->num_partitions;
+      }
+    }
+  } else if (right_partition_scheme_header) {
+    *left_needs_repartition = true;
+    *num_partitions = right_partition_scheme_header->num_partitions;
+  }
+
+  *right_needs_repartition = false;
+  if (right_partition_scheme_header) {
+    // Need to repartition unless the partition attributes are as the same as
+    // the join attributes.
+    *right_needs_repartition = true;
+    if (right_partition_scheme_header->isHashPartition()) {
+      unordered_set<E::ExprId> right_join_expr_ids;
+      for (const E::AttributeReferencePtr &attr : right_join_attributes) {
+        right_join_expr_ids.insert(attr->id());
+      }
+
+      if (right_partition_scheme_header->reusablePartitionScheme(right_join_expr_ids) &&
+          (*left_needs_repartition || *num_partitions == right_partition_scheme_header->num_partitions)) {
+        *right_needs_repartition = false;
+        *num_partitions = right_partition_scheme_header->num_partitions;
+      }
+    }
+  } else if (*left_needs_repartition) {
+    // TODO(quickstep-team): use a cost model to choose the broadcast hash join
+    // or repartitioned hash join if the right side has no partitions while the
+    // left side needs to repartition. For now, we always use the latter.
+    *right_needs_repartition = true;
+  }
+
+  if (*right_needs_repartition && *left_needs_repartition) {
+    *num_partitions = FLAGS_num_repartitions;
+  }
+
+  DCHECK_NE(1u, *num_partitions);
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/rules/Partition.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/Partition.hpp b/query_optimizer/rules/Partition.hpp
new file mode 100644
index 0000000..4421dc1
--- /dev/null
+++ b/query_optimizer/rules/Partition.hpp
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_PARTITION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_PARTITION_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+#include "gtest/gtest_prod.h"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Added partition support.
+ */
+class Partition final : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param optimizer_context The optimizer context.
+   */
+  explicit Partition(OptimizerContext *optimizer_context)
+      : optimizer_context_(optimizer_context) {
+  }
+
+  ~Partition() override {}
+
+  std::string getName() const override { return "Partition"; }
+
+ protected:
+  void init(const physical::PhysicalPtr &input) override;
+
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  /*
+   * Whether left or right side needs to repartition.
+   *
+   * --------------------------------------------------------------------------
+   * | Right \ Left     | No Partition  | Hash Partition h' | Other Partition |
+   * --------------------------------------------------------------------------
+   * | No Partition     | false \ false |  false \ false    |  true \ true    |
+   * --------------------------------------------------------------------------
+   * | Hash Partition h | false \ true  | false# \ false    | false \ true    |
+   * --------------------------------------------------------------------------
+   * | Other Partition  |  true \ true  |   true \ false    |  true \ true    |
+   * --------------------------------------------------------------------------
+   *
+   * Hash Partition h / h': the partition attributes are as the same as the join attributes.
+   * #: If h and h' has different number of partitions, the right side needs to repartition.
+   */
+  static void needsRepartitionForHashJoin(const physical::PartitionSchemeHeader *left_partition_scheme_header,
+                                          const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
+                                          const physical::PartitionSchemeHeader *right_partition_scheme_header,
+                                          const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+                                          bool *left_needs_repartition,
+                                          bool *right_needs_repartition,
+                                          std::size_t *num_partitions);
+
+  OptimizerContext *optimizer_context_;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  friend class PartitionTest;
+
+  DISALLOW_COPY_AND_ASSIGN(Partition);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_PARTITION_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/rules/tests/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/tests/CMakeLists.txt b/query_optimizer/rules/tests/CMakeLists.txt
index 0d913e2..42fa9c1 100644
--- a/query_optimizer/rules/tests/CMakeLists.txt
+++ b/query_optimizer/rules/tests/CMakeLists.txt
@@ -40,6 +40,7 @@ target_link_libraries(quickstep_queryoptimizer_rules_tests_PhysicalRuleTest
 add_executable(quickstep_queryoptimizer_rules_tests
                "${CMAKE_CURRENT_SOURCE_DIR}/CollapseProject_unittest.cpp"
                "${CMAKE_CURRENT_SOURCE_DIR}/GenerateJoins_unittest.cpp"
+               "${CMAKE_CURRENT_SOURCE_DIR}/Partition_unittest.cpp"
                "${CMAKE_CURRENT_SOURCE_DIR}/PruneColumns_unittest.cpp"
                "${CMAKE_CURRENT_SOURCE_DIR}/PushDownFilter_unittest.cpp"
                "${CMAKE_CURRENT_SOURCE_DIR}/UpdateExpression_unittest.cpp")
@@ -70,11 +71,13 @@ target_link_libraries(quickstep_queryoptimizer_rules_tests
                       quickstep_queryoptimizer_logical_TableReference
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_Selection
                       quickstep_queryoptimizer_physical_TableReference
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins
+                      quickstep_queryoptimizer_rules_Partition
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownFilter
                       quickstep_queryoptimizer_rules_Rule
@@ -91,5 +94,6 @@ target_link_libraries(quickstep_queryoptimizer_rules_tests
                       quickstep_types_operations_comparisons_ComparisonID
                       quickstep_utility_Cast
                       quickstep_utility_Macros
+                      ${GFLAGS_LIB_NAME}
                       ${LIBS})
 add_test(quickstep_queryoptimizer_rules_tests quickstep_queryoptimizer_rules_tests)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/rules/tests/Partition_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/tests/Partition_unittest.cpp b/query_optimizer/rules/tests/Partition_unittest.cpp
new file mode 100644
index 0000000..d755e18
--- /dev/null
+++ b/query_optimizer/rules/tests/Partition_unittest.cpp
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/Partition.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/rules/tests/PhysicalRuleTest.hpp"
+#include "utility/Macros.hpp"
+
+#include "gflags/gflags.h"
+#include "gtest/gtest.h"
+
+using std::make_unique;
+using std::move;
+using std::size_t;
+
+namespace quickstep {
+namespace optimizer {
+
+DECLARE_uint64(num_repartitions);
+
+namespace E = expressions;
+namespace P = physical;
+
+class PartitionTest : public PhysicalRuleTest {
+ protected:
+  PartitionTest() {}
+
+  ~PartitionTest() override {}
+
+  void SetUp() override {
+    PhysicalRuleTest::SetUp();
+  }
+
+  void setupRule(std::unique_ptr<Rule<P::Physical>> *rule) override {
+    *rule = make_unique<Partition>(optimizer_context());
+  }
+
+  void needsRepartitionForHashJoin() {
+    Partition::needsRepartitionForHashJoin(
+        left_partition_scheme_header_.get(), { relation_attribute_reference_0_0_ },
+        right_partition_scheme_header_.get(), { relation_attribute_reference_1_0_ },
+        &left_needs_repartition_, &right_needs_repartition_, &num_partitions_);
+  }
+
+  void useLeftHashPartitionMatchedJoinAttributes(const size_t num_partition) {
+    P::PartitionSchemeHeader::PartitionExprIds left_partition_expr_ids;
+    left_partition_expr_ids.push_back({ relation_attribute_reference_0_0_->id() });
+    left_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        P::PartitionSchemeHeader::PartitionType::kHash,
+        num_partition, move(left_partition_expr_ids));
+  }
+
+  void useLeftHashPartitionNotMatchedJoinAttributes(const size_t num_partition) {
+    P::PartitionSchemeHeader::PartitionExprIds left_partition_expr_ids;
+    left_partition_expr_ids.push_back({ relation_attribute_reference_0_1_->id() });
+    left_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        P::PartitionSchemeHeader::PartitionType::kHash,
+        num_partition, move(left_partition_expr_ids));
+  }
+
+  void useRightHashPartitionMatchedJoinAttributes(const size_t num_partition) {
+    P::PartitionSchemeHeader::PartitionExprIds right_partition_expr_ids;
+    right_partition_expr_ids.push_back({ relation_attribute_reference_1_0_->id() });
+    right_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        P::PartitionSchemeHeader::PartitionType::kHash,
+        num_partition, move(right_partition_expr_ids));
+  }
+
+  void useRightHashPartitionNotMatchedJoinAttributes(const size_t num_partition) {
+    P::PartitionSchemeHeader::PartitionExprIds right_partition_expr_ids;
+    right_partition_expr_ids.push_back({ relation_attribute_reference_1_1_->id() });
+    right_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        P::PartitionSchemeHeader::PartitionType::kHash,
+        num_partition, move(right_partition_expr_ids));
+  }
+
+  std::unique_ptr<P::PartitionSchemeHeader> left_partition_scheme_header_, right_partition_scheme_header_;
+  bool left_needs_repartition_, right_needs_repartition_;
+  size_t num_partitions_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(PartitionTest);
+};
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionMatchedJoinAttributesRightNoPartitionTest) {
+  const size_t kLeftNumPartitions = 64u;
+  useLeftHashPartitionMatchedJoinAttributes(kLeftNumPartitions);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_FALSE(left_needs_repartition_);
+  EXPECT_FALSE(right_needs_repartition_);
+  EXPECT_EQ(kLeftNumPartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionNotMatchedJoinAttributesRightNoPartitionTest) {
+  useLeftHashPartitionNotMatchedJoinAttributes(16u);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_TRUE(left_needs_repartition_);
+  EXPECT_TRUE(right_needs_repartition_);
+  EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftNonHashPartitionRightNoPartitionTest) {
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    left_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 8u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    needsRepartitionForHashJoin();
+
+    EXPECT_TRUE(left_needs_repartition_);
+    EXPECT_TRUE(right_needs_repartition_);
+    EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+  }
+}
+
+TEST_F(PartitionTest, HashJoinLeftNoPartitionRightHashPartitionMatchedJoinAttributesTest) {
+  const size_t kRightNumPartitions = 64u;
+  useRightHashPartitionMatchedJoinAttributes(kRightNumPartitions);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_TRUE(left_needs_repartition_);
+  EXPECT_FALSE(right_needs_repartition_);
+  EXPECT_EQ(kRightNumPartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionMatchedJoinAttributesRightHashPartitionMatchedJoinAttributesTest) {
+  const size_t kNumPartitions = 64u;
+  useLeftHashPartitionMatchedJoinAttributes(kNumPartitions);
+  useRightHashPartitionMatchedJoinAttributes(kNumPartitions);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_FALSE(left_needs_repartition_);
+  EXPECT_FALSE(right_needs_repartition_);
+  EXPECT_EQ(kNumPartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest,
+       HashJoinLeftHashPartitionMatchedJoinAttributesRightHashPartitionMatchedJoinAttributesNotEqualNumPartitionsTest) {
+  const size_t kLeftNumPartitions = 128u;
+  useLeftHashPartitionMatchedJoinAttributes(kLeftNumPartitions);
+  useRightHashPartitionMatchedJoinAttributes(64u);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_FALSE(left_needs_repartition_);
+  EXPECT_TRUE(right_needs_repartition_);
+  EXPECT_EQ(kLeftNumPartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionNotMatchedJoinAttributesRightHashPartitionMatchedJoinAttributesTest) {
+  useLeftHashPartitionNotMatchedJoinAttributes(16u);
+
+  const size_t kRightNumPartitions = 64u;
+  useRightHashPartitionMatchedJoinAttributes(kRightNumPartitions);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_TRUE(left_needs_repartition_);
+  EXPECT_FALSE(right_needs_repartition_);
+  EXPECT_EQ(kRightNumPartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftNonHashPartitionRightHashPartitionMatchedJoinAttributesTest) {
+  const size_t kRightNumPartitions = 64u;
+  useRightHashPartitionMatchedJoinAttributes(kRightNumPartitions);
+
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    left_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 8u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    needsRepartitionForHashJoin();
+
+    EXPECT_TRUE(left_needs_repartition_);
+    EXPECT_FALSE(right_needs_repartition_);
+    EXPECT_EQ(kRightNumPartitions, num_partitions_);
+  }
+}
+
+TEST_F(PartitionTest, HashJoinLeftNoPartitionRightHashPartitionNotMatchedJoinAttributesTest) {
+  useRightHashPartitionNotMatchedJoinAttributes(16u);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_TRUE(left_needs_repartition_);
+  EXPECT_TRUE(right_needs_repartition_);
+  EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionMatchedJoinAttributesRightHashPartitionNotMatchedJoinAttributesTest) {
+  const size_t kLeftNumPartitions = 64u;
+  useLeftHashPartitionMatchedJoinAttributes(kLeftNumPartitions);
+  useRightHashPartitionNotMatchedJoinAttributes(16u);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_FALSE(left_needs_repartition_);
+  EXPECT_TRUE(right_needs_repartition_);
+  EXPECT_EQ(kLeftNumPartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionNotMatchedJoinAttributesRightHashPartitionNotMatchedJoinAttributesTest) {
+  useLeftHashPartitionNotMatchedJoinAttributes(64u);
+  useRightHashPartitionNotMatchedJoinAttributes(16u);
+
+  needsRepartitionForHashJoin();
+
+  EXPECT_TRUE(left_needs_repartition_);
+  EXPECT_TRUE(right_needs_repartition_);
+  EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+}
+
+TEST_F(PartitionTest, HashJoinLeftNonHashPartitionRightHashPartitionNotMatchedJoinAttributesTest) {
+  useRightHashPartitionNotMatchedJoinAttributes(16u);
+
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    left_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 8u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    needsRepartitionForHashJoin();
+
+    EXPECT_TRUE(left_needs_repartition_);
+    EXPECT_TRUE(right_needs_repartition_);
+    EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+  }
+}
+
+TEST_F(PartitionTest, HashJoinLeftNoPartitionRightNonHashPartitionTest) {
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    right_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 64u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    needsRepartitionForHashJoin();
+
+    EXPECT_TRUE(left_needs_repartition_);
+    EXPECT_TRUE(right_needs_repartition_);
+    EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+  }
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionMatchedJoinAttributesRightNonHashPartitionTest) {
+  const size_t kLeftNumPartitions = 128u;
+  useLeftHashPartitionMatchedJoinAttributes(kLeftNumPartitions);
+
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    right_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 64u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    needsRepartitionForHashJoin();
+
+    EXPECT_FALSE(left_needs_repartition_);
+    EXPECT_TRUE(right_needs_repartition_);
+    EXPECT_EQ(kLeftNumPartitions, num_partitions_);
+  }
+}
+
+TEST_F(PartitionTest, HashJoinLeftHashPartitionNotMatchedJoinAttributesRightNonHashPartitionTest) {
+  useLeftHashPartitionNotMatchedJoinAttributes(16u);
+
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    right_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 64u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    needsRepartitionForHashJoin();
+
+    EXPECT_TRUE(left_needs_repartition_);
+    EXPECT_TRUE(right_needs_repartition_);
+    EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+  }
+}
+
+TEST_F(PartitionTest, HashJoinLeftNonHashPartitionRightNonHashPartitionTest) {
+  for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                     P::PartitionSchemeHeader::PartitionType::kRange }) {
+    left_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+        partition_type, 8u, P::PartitionSchemeHeader::PartitionExprIds());
+
+    for (const auto partition_type : { P::PartitionSchemeHeader::PartitionType::kRandom,
+                                       P::PartitionSchemeHeader::PartitionType::kRange }) {
+      right_partition_scheme_header_ = make_unique<P::PartitionSchemeHeader>(
+          partition_type, 64u, P::PartitionSchemeHeader::PartitionExprIds());
+
+      needsRepartitionForHashJoin();
+
+      EXPECT_TRUE(left_needs_repartition_);
+      EXPECT_TRUE(right_needs_repartition_);
+      EXPECT_EQ(FLAGS_num_repartitions, num_partitions_);
+    }
+  }
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4b896349/query_optimizer/tests/execution_generator/Partition.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Partition.test b/query_optimizer/tests/execution_generator/Partition.test
index ab05391..eb3ec98 100644
--- a/query_optimizer/tests/execution_generator/Partition.test
+++ b/query_optimizer/tests/execution_generator/Partition.test
@@ -15,19 +15,35 @@
 # specific language governing permissions and limitations
 # under the License.
 
-CREATE TABLE foo (id INT NULL,
-                  name CHAR(20))
+CREATE TABLE dim_4_hash_partitions (id INT NULL,
+                                    char_col CHAR(20))
 PARTITION BY HASH(id) PARTITIONS 4;
+CREATE TABLE dim_2_hash_partitions (id INT NULL,
+                                    char_col CHAR(20))
+PARTITION BY HASH(id) PARTITIONS 2;
+CREATE TABLE fact (id INT NULL,
+                   score DOUBLE NULL)
+PARTITION BY HASH(id) PARTITIONS 4;
+
+INSERT INTO dim_4_hash_partitions
+SELECT int_col, char_col
+FROM test
+WHERE int_col > 0 OR int_col < 0;
 
-INSERT INTO foo
+INSERT INTO dim_2_hash_partitions
 SELECT int_col, char_col
 FROM test
 WHERE int_col > 0 OR int_col < 0;
 
-SELECT * FROM foo;
+INSERT INTO fact
+SELECT int_col, double_col
+FROM test
+WHERE int_col % 2 = 0;
+
+SELECT * FROM dim_4_hash_partitions;
 --
 +-----------+--------------------+
-|id         |name                |
+|id         |char_col            |
 +-----------+--------------------+
 |          4|          4 2.000000|
 |          8|          8 2.828427|
@@ -52,3 +68,62 @@ SELECT * FROM foo;
 |        -17|        -17 4.123106|
 |        -21|        -21 4.582576|
 +-----------+--------------------+
+==
+
+# Partitioned Hash Join.
+SELECT fact.id, dim_4_hash_partitions.char_col
+FROM dim_4_hash_partitions JOIN fact ON dim_4_hash_partitions.id = fact.id;
+--
++-----------+--------------------+
+|id         |char_col            |
++-----------+--------------------+
+|          4|          4 2.000000|
+|          8|          8 2.828427|
+|         12|         12 3.464102|
+|         16|         16 4.000000|
+|         24|         24 4.898979|
+|          2|          2 1.414214|
+|          6|          6 2.449490|
+|         14|         14 3.741657|
+|         18|         18 4.242641|
+|         22|         22 4.690416|
++-----------+--------------------+
+==
+
+# Hash Join with two stored relations, one of which is partitioned.
+SELECT fact.id, test.char_col
+FROM test JOIN fact ON test.int_col = fact.id
+WHERE test.int_col > 0 OR test.int_col < 0;
+--
+[same as above]
+==
+
+# Hash Join with one stored, partitioned relation,
+# and a non-stored, non-partitioned one.
+SELECT fact.id, test.char_col
+FROM fact JOIN test ON fact.id = test.int_col
+WHERE test.int_col % 2 = 0;
+--
+[same as above]
+==
+
+# Repartitioned Hash Join.
+SELECT fact.id, dim_2_hash_partitions.char_col
+FROM dim_2_hash_partitions, fact
+WHERE dim_2_hash_partitions.id = fact.id
+  AND dim_2_hash_partitions.id % 2 = 0;
+--
++-----------+--------------------+
+|id         |char_col            |
++-----------+--------------------+
+|          2|          2 1.414214|
+|          4|          4 2.000000|
+|          6|          6 2.449490|
+|          8|          8 2.828427|
+|         12|         12 3.464102|
+|         14|         14 3.741657|
+|         16|         16 4.000000|
+|         18|         18 4.242641|
+|         22|         22 4.690416|
+|         24|         24 4.898979|
++-----------+--------------------+