You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/15 01:47:39 UTC

incubator-quickstep git commit: Using PartitionSchemeHeader in Physical Plan node.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 13c16b9cb -> 5fbfd2111


Using PartitionSchemeHeader in Physical Plan node.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5fbfd211
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5fbfd211
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5fbfd211

Branch: refs/heads/master
Commit: 5fbfd21110091cfaf262daa8d9d2620a4e3f03bb
Parents: 13c16b9
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jun 13 21:50:41 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 20:42:37 2017 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt              |   3 +-
 query_optimizer/ExecutionGenerator.cpp      | 100 ++++++++++++++++-------
 query_optimizer/physical/BinaryJoin.cpp     |   7 ++
 query_optimizer/physical/BinaryJoin.hpp     |   8 +-
 query_optimizer/physical/CMakeLists.txt     |   8 ++
 query_optimizer/physical/HashJoin.cpp       |   4 +-
 query_optimizer/physical/HashJoin.hpp       |  24 ++++--
 query_optimizer/physical/Join.hpp           |   9 +-
 query_optimizer/physical/Physical.hpp       |  52 +++++++++++-
 query_optimizer/physical/Selection.cpp      |  40 ++++++++-
 query_optimizer/physical/Selection.hpp      |  27 ++++--
 query_optimizer/physical/TableGenerator.hpp |  18 +++-
 query_optimizer/physical/TableReference.cpp |  55 +++++++++++++
 query_optimizer/physical/TableReference.hpp |  21 ++---
 query_optimizer/rules/CMakeLists.txt        |   4 +-
 relational_operators/HashJoinOperator.cpp   |   6 ++
 relational_operators/HashJoinOperator.hpp   |   4 +-
 relational_operators/SelectOperator.cpp     |  13 +--
 relational_operators/SelectOperator.hpp     |  10 ++-
 relational_operators/WorkOrder.proto        |   2 +
 relational_operators/WorkOrderFactory.cpp   |   9 +-
 storage/CMakeLists.txt                      |   1 +
 storage/InsertDestination.cpp               |  34 ++------
 storage/InsertDestination.hpp               |  34 +++++++-
 utility/CMakeLists.txt                      |   1 +
 utility/PlanVisualizer.cpp                  |   6 ++
 26 files changed, 394 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index c969f16..04af02c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -103,6 +103,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_InsertTuple
                       quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
@@ -211,6 +212,7 @@ target_link_libraries(quickstep_queryoptimizer_OptimizerTree
 target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       ${GFLAGS_LIB_NAME}
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
+                      quickstep_queryoptimizer_Validator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
@@ -230,7 +232,6 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_strategy_OneToOne
                       quickstep_queryoptimizer_strategy_Selection
                       quickstep_queryoptimizer_strategy_Strategy
-                      quickstep_queryoptimizer_Validator
                       quickstep_utility_Macros
                       quickstep_utility_PlanVisualizer)
 target_link_libraries(quickstep_queryoptimizer_QueryHandle

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 3b2fe08..a7c7328 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -84,6 +84,7 @@
 #include "query_optimizer/physical/InsertTuple.hpp"
 #include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
 #include "query_optimizer/physical/PatternMatcher.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -140,6 +141,7 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+using std::make_unique;
 using std::move;
 using std::static_pointer_cast;
 using std::unique_ptr;
@@ -363,16 +365,52 @@ void ExecutionGenerator::createTemporaryCatalogRelation(
     ++aid;
   }
 
+  const P::PartitionSchemeHeader *partition_scheme_header = physical->getOutputPartitionSchemeHeader();
+  if (partition_scheme_header) {
+    PartitionSchemeHeader::PartitionAttributeIds output_partition_attr_ids;
+    for (const auto &partition_equivalent_expr_ids : partition_scheme_header->partition_expr_ids) {
+      DCHECK(!partition_equivalent_expr_ids.empty());
+      const E::ExprId partition_expr_id = *partition_equivalent_expr_ids.begin();
+      DCHECK(attribute_substitution_map_.find(partition_expr_id) != attribute_substitution_map_.end());
+      output_partition_attr_ids.push_back(attribute_substitution_map_[partition_expr_id]->getID());
+    }
+
+    const size_t num_partition = partition_scheme_header->num_partitions;
+    unique_ptr<PartitionSchemeHeader> output_partition_scheme_header;
+    switch (partition_scheme_header->partition_type) {
+      case P::PartitionSchemeHeader::PartitionType::kHash:
+        output_partition_scheme_header =
+            make_unique<HashPartitionSchemeHeader>(num_partition, move(output_partition_attr_ids));
+        break;
+      case P::PartitionSchemeHeader::PartitionType::kRandom:
+        output_partition_scheme_header =
+            make_unique<RandomPartitionSchemeHeader>(num_partition);
+        break;
+      case P::PartitionSchemeHeader::PartitionType::kRange:
+        LOG(FATAL) << "Unimplemented";
+      default:
+        LOG(FATAL) << "Unknown partition type";
+    }
+    auto output_partition_scheme = make_unique<PartitionScheme>(output_partition_scheme_header.release());
+
+    insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+    insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+        ->MergeFrom(output_partition_scheme->getProto());
+
+    catalog_relation->setPartitionScheme(output_partition_scheme.release());
+  } else {
+    insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
+  }
+
   *catalog_relation_output = catalog_relation.get();
   const relation_id output_rel_id = catalog_database_->addRelation(
       catalog_relation.release());
 
+  insert_destination_proto->set_relation_id(output_rel_id);
+
 #ifdef QUICKSTEP_DISTRIBUTED
   referenced_relation_ids_.insert(output_rel_id);
 #endif
-
-  insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::BLOCK_POOL);
-  insert_destination_proto->set_relation_id(output_rel_id);
 }
 
 void ExecutionGenerator::dropAllTemporaryRelations() {
@@ -499,13 +537,28 @@ bool ExecutionGenerator::convertSimpleProjection(
 
 void ExecutionGenerator::convertSelection(
     const P::SelectionPtr &physical_selection) {
+  const P::PhysicalPtr input = physical_selection->input();
+  const CatalogRelationInfo *input_relation_info =
+      findRelationInfoOutputByPhysical(input);
+  DCHECK(input_relation_info != nullptr);
+  const CatalogRelation &input_relation = *input_relation_info->relation;
+
   // Check if the Selection is only for renaming columns.
   if (physical_selection->filter_predicate() == nullptr) {
-    const std::vector<E::AttributeReferencePtr> input_attributes =
-        physical_selection->input()->getOutputAttributes();
+    const P::PartitionSchemeHeader *physical_select_partition_scheme_header =
+        physical_selection->getOutputPartitionSchemeHeader();
+    const P::PartitionSchemeHeader *physical_input_partition_scheme_header = input->getOutputPartitionSchemeHeader();
+
+    const bool are_same_physical_partition_scheme_headers =
+        (!physical_select_partition_scheme_header && !physical_input_partition_scheme_header) ||
+        (physical_select_partition_scheme_header && physical_input_partition_scheme_header &&
+         physical_select_partition_scheme_header->equal(*physical_input_partition_scheme_header));
+
+    const std::vector<E::AttributeReferencePtr> input_attributes = input->getOutputAttributes();
+
     const std::vector<E::NamedExpressionPtr> &project_expressions =
         physical_selection->project_expressions();
-    if (project_expressions.size() == input_attributes.size()) {
+    if (project_expressions.size() == input_attributes.size() && are_same_physical_partition_scheme_headers) {
       bool has_different_attrs = false;
       for (std::size_t attr_idx = 0; attr_idx < input_attributes.size(); ++attr_idx) {
         if (project_expressions[attr_idx]->id() != input_attributes[attr_idx]->id()) {
@@ -514,12 +567,9 @@ void ExecutionGenerator::convertSelection(
         }
       }
       if (!has_different_attrs) {
-        const std::unordered_map<P::PhysicalPtr, CatalogRelationInfo>::const_iterator input_catalog_rel_it =
-            physical_to_output_relation_map_.find(physical_selection->input());
-        DCHECK(input_catalog_rel_it != physical_to_output_relation_map_.end());
-        if (!input_catalog_rel_it->second.isStoredRelation()) {
+        if (!input_relation_info->isStoredRelation()) {
           CatalogRelation *catalog_relation =
-              const_cast<CatalogRelation*>(input_catalog_rel_it->second.relation);
+              const_cast<CatalogRelation*>(input_relation_info->relation);
           for (std::size_t attr_idx = 0; attr_idx < project_expressions.size(); ++attr_idx) {
             CatalogAttribute *catalog_attribute =
                 catalog_relation->getAttributeByIdMutable(attr_idx);
@@ -528,7 +578,7 @@ void ExecutionGenerator::convertSelection(
                 project_expressions[attr_idx]->attribute_alias());
           }
           physical_to_output_relation_map_.emplace(physical_selection,
-                                                   input_catalog_rel_it->second);
+                                                   *input_relation_info);
           return;
         }
       }
@@ -560,10 +610,6 @@ void ExecutionGenerator::convertSelection(
                                  insert_destination_proto);
 
   // Create and add a Select operator.
-  const CatalogRelationInfo *input_relation_info =
-      findRelationInfoOutputByPhysical(physical_selection->input());
-  DCHECK(input_relation_info != nullptr);
-  const CatalogRelation &input_relation = *input_relation_info->relation;
   const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
 
   const std::size_t num_partitions =
@@ -799,17 +845,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   S::QueryContext::HashTableContext *hash_table_context_proto =
       query_context_proto_->add_join_hash_tables();
 
-  // No partition.
-  std::size_t num_partitions = 1;
-  if (build_relation->hasPartitionScheme() &&
-      build_attribute_ids.size() == 1) {
-    const PartitionSchemeHeader &partition_scheme_header =
-        build_relation->getPartitionScheme()->getPartitionSchemeHeader();
-    if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeIds().front()) {
-      // TODO(zuyu): add optimizer support for partitioned hash joins.
-      hash_table_context_proto->set_num_partitions(num_partitions);
-    }
-  }
+  const P::PartitionSchemeHeader *probe_partition_scheme_header = probe_physical->getOutputPartitionSchemeHeader();
+  const std::size_t probe_num_partitions =
+      probe_partition_scheme_header ? probe_partition_scheme_header->num_partitions : 1u;
+  hash_table_context_proto->set_num_partitions(probe_num_partitions);
+
 
   S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
 
@@ -837,7 +877,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
               build_relation_info->isStoredRelation(),
               build_attribute_ids,
               any_build_attributes_nullable,
-              num_partitions,
+              probe_num_partitions,
               join_hash_table_index));
 
   // Create InsertDestination proto.
@@ -880,7 +920,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
               probe_operator_info->isStoredRelation(),
               probe_attribute_ids,
               any_probe_attributes_nullable,
-              num_partitions,
+              probe_num_partitions,
               *output_relation,
               insert_destination_index,
               join_hash_table_index,
@@ -892,7 +932,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
 
   const QueryPlan::DAGNodeIndex destroy_operator_index =
       execution_plan_->addRelationalOperator(new DestroyHashOperator(
-          query_handle_->query_id(), num_partitions, join_hash_table_index));
+          query_handle_->query_id(), probe_num_partitions, join_hash_table_index));
 
   if (!build_relation_info->isStoredRelation()) {
     execution_plan_->addDirectDependency(build_operator_index,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/BinaryJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/BinaryJoin.cpp b/query_optimizer/physical/BinaryJoin.cpp
index 30e2e8d..1928198 100644
--- a/query_optimizer/physical/BinaryJoin.cpp
+++ b/query_optimizer/physical/BinaryJoin.cpp
@@ -23,6 +23,8 @@
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/Physical.hpp"
 #include "utility/Cast.hpp"
 
 namespace quickstep {
@@ -38,6 +40,11 @@ void BinaryJoin::getFieldStringItems(
     std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
     std::vector<std::string> *container_child_field_names,
     std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  if (partition_scheme_header_) {
+    inline_field_names->push_back("output_partition_scheme_header");
+    inline_field_values->push_back(partition_scheme_header_->toString());
+  }
+
   non_container_child_field_names->push_back("left");
   non_container_child_field_names->push_back("right");
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/BinaryJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/BinaryJoin.hpp b/query_optimizer/physical/BinaryJoin.hpp
index 4b5671d..d1f652f 100644
--- a/query_optimizer/physical/BinaryJoin.hpp
+++ b/query_optimizer/physical/BinaryJoin.hpp
@@ -41,6 +41,8 @@ namespace physical {
 class BinaryJoin;
 typedef std::shared_ptr<const BinaryJoin> BinaryJoinPtr;
 
+struct PartitionSchemeHeader;
+
 /**
  * @brief Base class for binary join nodes.
  */
@@ -68,11 +70,13 @@ class BinaryJoin : public Join {
    * @param left The left operand.
    * @param right The right operand.
    * @param project_expressions The project expressions.
+   * @param partition_scheme_header The optional output partition scheme header.
    */
   BinaryJoin(const PhysicalPtr &left,
              const PhysicalPtr &right,
-             const std::vector<expressions::NamedExpressionPtr> &project_expressions)
-      : Join(project_expressions),
+             const std::vector<expressions::NamedExpressionPtr> &project_expressions,
+             PartitionSchemeHeader *partition_scheme_header = nullptr)
+      : Join(project_expressions, partition_scheme_header),
         left_(left),
         right_(right) {
     addChild(left_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index 1e777b1..e0b1d25 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -67,6 +67,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_BinaryJoin
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_physical_Join
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_utility_Cast
                       quickstep_utility_Macros)
@@ -202,9 +203,11 @@ target_link_libraries(quickstep_queryoptimizer_physical_PartitionSchemeHeader
 target_link_libraries(quickstep_queryoptimizer_physical_PatternMatcher
                       quickstep_queryoptimizer_physical_PhysicalType)
 target_link_libraries(quickstep_queryoptimizer_physical_Physical
+                      glog
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_physical_Sample
@@ -225,6 +228,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_Selection
                       quickstep_queryoptimizer_expressions_LogicalAnd
                       quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
@@ -254,6 +258,7 @@ target_link_libraries(quickstep_queryoptimizer_physical_TableGenerator
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
@@ -261,10 +266,13 @@ target_link_libraries(quickstep_queryoptimizer_physical_TableGenerator
 target_link_libraries(quickstep_queryoptimizer_physical_TableReference
                       glog
                       quickstep_catalog_CatalogRelation
+                      quickstep_catalog_PartitionScheme
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExpressionUtil
                       quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/HashJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index e186072..b0ee913 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/Physical.hpp"
 #include "utility/Cast.hpp"
 
 namespace quickstep {
@@ -79,7 +80,8 @@ bool HashJoin::maybeCopyWithPrunedExpressions(
                      right_join_attributes_,
                      residual_predicate_,
                      new_project_expressions,
-                     join_type_);
+                     join_type_,
+                     cloneOutputPartitionSchemeHeader());
     return true;
   }
   return false;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index c513f77..1804d4b 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -49,6 +49,8 @@ namespace physical {
 class HashJoin;
 typedef std::shared_ptr<const HashJoin> HashJoinPtr;
 
+struct PartitionSchemeHeader;
+
 /**
  * @brief Physical hash join node.
  */
@@ -116,11 +118,18 @@ class HashJoin : public BinaryJoin {
                   right_join_attributes_,
                   residual_predicate_,
                   project_expressions(),
-                  join_type_);
+                  join_type_,
+                  cloneOutputPartitionSchemeHeader());
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
 
+  PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+      PartitionSchemeHeader *partition_scheme_header) const override {
+    return Create(left(), right(), left_join_attributes_, right_join_attributes_,
+                  residual_predicate_, project_expressions(), join_type_, partition_scheme_header);
+  }
+
   bool maybeCopyWithPrunedExpressions(
       const expressions::UnorderedNamedExpressionSet &referenced_expressions,
       PhysicalPtr *output) const override;
@@ -136,6 +145,8 @@ class HashJoin : public BinaryJoin {
    * @param residual_predicate Optional filtering predicate evaluated after join.
    * @param project_expressions The project expressions.
    * @param Join type of this hash join.
+   * @param partition_scheme_header The optional output partition scheme header.
+   *
    * @return An immutable physical HashJoin.
    */
   static HashJoinPtr Create(
@@ -145,7 +156,8 @@ class HashJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const JoinType join_type) {
+      const JoinType join_type,
+      PartitionSchemeHeader *partition_scheme_header = nullptr) {
     return HashJoinPtr(
         new HashJoin(left,
                      right,
@@ -153,7 +165,8 @@ class HashJoin : public BinaryJoin {
                      right_join_attributes,
                      residual_predicate,
                      project_expressions,
-                     join_type));
+                     join_type,
+                     partition_scheme_header));
   }
 
  protected:
@@ -173,8 +186,9 @@ class HashJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const JoinType join_type)
-      : BinaryJoin(left, right, project_expressions),
+      const JoinType join_type,
+      PartitionSchemeHeader *partition_scheme_header)
+      : BinaryJoin(left, right, project_expressions, partition_scheme_header),
         left_join_attributes_(left_join_attributes),
         right_join_attributes_(right_join_attributes),
         residual_predicate_(residual_predicate),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Join.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Join.hpp b/query_optimizer/physical/Join.hpp
index 305aa52..d72d072 100644
--- a/query_optimizer/physical/Join.hpp
+++ b/query_optimizer/physical/Join.hpp
@@ -40,6 +40,8 @@ namespace physical {
 class Join;
 typedef std::shared_ptr<const Join> JoinPtr;
 
+struct PartitionSchemeHeader;
+
 /**
  * @brief Base class for physical join nodes.
  */
@@ -68,10 +70,13 @@ class Join : public Physical {
    * @brief Constructor.
    *
    * @param project_expressions The project expressions.
+   * @param partition_scheme_header The optional output partition scheme header.
    */
   explicit Join(
-      const std::vector<expressions::NamedExpressionPtr>& project_expressions)
-      : project_expressions_(project_expressions) {}
+      const std::vector<expressions::NamedExpressionPtr>& project_expressions,
+      PartitionSchemeHeader *partition_scheme_header = nullptr)
+      : Physical(partition_scheme_header),
+        project_expressions_(project_expressions) {}
 
  private:
   std::vector<expressions::NamedExpressionPtr> project_expressions_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Physical.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Physical.hpp b/query_optimizer/physical/Physical.hpp
index 4bed593..2279a84 100644
--- a/query_optimizer/physical/Physical.hpp
+++ b/query_optimizer/physical/Physical.hpp
@@ -26,10 +26,12 @@
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
-
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 namespace optimizer {
 namespace physical {
@@ -86,11 +88,57 @@ class Physical : public OptimizerTree<Physical> {
       const expressions::UnorderedNamedExpressionSet &referenced_expressions,
       PhysicalPtr *output) const = 0;
 
+  /**
+   * @brief Creates a copy with the partition scheme header replaced by \p
+   *        partition_scheme_header.
+   *
+   * @param partition_scheme_header The partition scheme header to be
+   *        substituted for the existing one, if any. It takes ownership of
+   *        'partition_scheme_header'.
+   *
+   * @return A copy with \p partition_scheme_header as the partition scheme
+   *         header.
+   */
+  virtual PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+      PartitionSchemeHeader *partition_scheme_header) const {
+    std::unique_ptr<PartitionSchemeHeader> new_partition_scheme_header(partition_scheme_header);
+    LOG(FATAL) << "copyWithNewOutputPartitionSchemeHeader is not implemented for " << getName();
+  }
+
+  /**
+   * @brief Get the partition scheme of the physical plan node.
+   *
+   * @return A const pointer to the partition scheme of the node.
+   **/
+  const PartitionSchemeHeader* getOutputPartitionSchemeHeader() const {
+    return partition_scheme_header_.get();
+  }
+
  protected:
   /**
    * @brief Constructor.
+   *
+   * @param partition_scheme_header The partition scheme header of the relation.
+   *        The constructor takes ownership of 'partition_scheme_header'.
    */
-  Physical() {}
+  explicit Physical(PartitionSchemeHeader *partition_scheme_header = nullptr)
+      : partition_scheme_header_(partition_scheme_header) {}
+
+  /**
+   * @brief Clone a copy of the partition scheme header.
+   *
+   * @return A copy of the partition scheme header. Caller should take ownership
+   *         of the returned object.
+   **/
+  PartitionSchemeHeader* cloneOutputPartitionSchemeHeader() const {
+    if (partition_scheme_header_) {
+      return new PartitionSchemeHeader(*partition_scheme_header_);
+    }
+
+    return nullptr;
+  }
+
+  std::unique_ptr<PartitionSchemeHeader> partition_scheme_header_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(Physical);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Selection.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp
index 36ade04..e2fa69e 100644
--- a/query_optimizer/physical/Selection.cpp
+++ b/query_optimizer/physical/Selection.cpp
@@ -19,27 +19,58 @@
 
 #include "query_optimizer/physical/Selection.hpp"
 
+#include <memory>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/Physical.hpp"
 #include "utility/Cast.hpp"
 
 #include "glog/logging.h"
 
+using std::unordered_set;
+
 namespace quickstep {
 namespace optimizer {
 namespace physical {
 
 namespace E = ::quickstep::optimizer::expressions;
 
+SelectionPtr Selection::Create(
+      const PhysicalPtr &input,
+      const std::vector<E::NamedExpressionPtr> &project_expressions,
+      const E::PredicatePtr &filter_predicate,
+      PartitionSchemeHeader *output_partition_scheme_header) {
+  std::unique_ptr<PartitionSchemeHeader> partition_scheme_header(output_partition_scheme_header);
+
+  if (!partition_scheme_header) {
+    const PartitionSchemeHeader *input_partition_scheme_header = input->getOutputPartitionSchemeHeader();
+    if (input_partition_scheme_header) {
+      unordered_set<E::ExprId> project_expr_ids;
+      for (const E::NamedExpressionPtr &project_expression : project_expressions) {
+        project_expr_ids.insert(project_expression->id());
+      }
+
+      if (input_partition_scheme_header->reusablePartitionScheme(project_expr_ids)) {
+        partition_scheme_header = std::make_unique<PartitionSchemeHeader>(*input_partition_scheme_header);
+      }
+    }
+  }
+
+  return SelectionPtr(
+      new Selection(input, project_expressions, filter_predicate, partition_scheme_header.release()));
+}
+
 PhysicalPtr Selection::copyWithNewChildren(
     const std::vector<PhysicalPtr> &new_children) const {
   DCHECK_EQ(children().size(), new_children.size());
-  return Create(new_children[0], project_expressions_, filter_predicate_);
+  return Create(new_children[0], project_expressions_, filter_predicate_, cloneOutputPartitionSchemeHeader());
 }
 
 std::vector<E::AttributeReferencePtr> Selection::getOutputAttributes() const {
@@ -76,7 +107,7 @@ bool Selection::maybeCopyWithPrunedExpressions(
     }
   }
   if (new_project_expressions.size() != project_expressions_.size()) {
-    *output = Create(input(), new_project_expressions, filter_predicate_);
+    *output = Create(input(), new_project_expressions, filter_predicate_, cloneOutputPartitionSchemeHeader());
     return true;
   }
   return false;
@@ -89,6 +120,11 @@ void Selection::getFieldStringItems(
     std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
     std::vector<std::string> *container_child_field_names,
     std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  if (partition_scheme_header_) {
+    inline_field_names->push_back("output_partition_scheme_header");
+    inline_field_values->push_back(partition_scheme_header_->toString());
+  }
+
   non_container_child_field_names->emplace_back("input");
   non_container_child_fields->emplace_back(input());
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/Selection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp
index b6874a1..204eb2f 100644
--- a/query_optimizer/physical/Selection.hpp
+++ b/query_optimizer/physical/Selection.hpp
@@ -42,6 +42,8 @@ namespace physical {
  *  @{
  */
 
+struct PartitionSchemeHeader;
+
 class Selection;
 typedef std::shared_ptr<const Selection> SelectionPtr;
 
@@ -82,6 +84,11 @@ class Selection : public Physical {
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
 
+  PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+      PartitionSchemeHeader *partition_scheme_header) const override {
+    return Create(input(), project_expressions_, filter_predicate_, partition_scheme_header);
+  }
+
   bool maybeCopyWithPrunedExpressions(
       const expressions::UnorderedNamedExpressionSet &referenced_attributes,
       PhysicalPtr *output) const override;
@@ -92,15 +99,17 @@ class Selection : public Physical {
    * @param input The input node.
    * @param project_expressions The project expressions.
    * @param filter_predicate The filter predicate. Can be NULL.
+   * @param output_partition_scheme_header The partition scheme header that
+   *        overwrites that from input, if not NULL. It takes ownership of
+   *        'output_partition_scheme_header'.
+   *
    * @return An immutable Selection.
    */
   static SelectionPtr Create(
       const PhysicalPtr &input,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const expressions::PredicatePtr &filter_predicate) {
-    return SelectionPtr(
-        new Selection(input, project_expressions, filter_predicate));
-  }
+      const expressions::PredicatePtr &filter_predicate,
+      PartitionSchemeHeader *output_partition_scheme_header = nullptr);
 
   /**
    * @brief Creates a conjunctive predicate with \p filter_predicates
@@ -140,15 +149,17 @@ class Selection : public Physical {
   Selection(
       const PhysicalPtr &input,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const expressions::PredicatePtr &filter_predicate)
-      : project_expressions_(project_expressions),
+      const expressions::PredicatePtr &filter_predicate,
+      PartitionSchemeHeader *partition_scheme_header)
+      : Physical(partition_scheme_header),
+        project_expressions_(project_expressions),
         filter_predicate_(filter_predicate) {
     addChild(input);
   }
 
-  std::vector<expressions::NamedExpressionPtr> project_expressions_;
+  const std::vector<expressions::NamedExpressionPtr> project_expressions_;
   // Can be NULL. If NULL, the filter predicate is treated as the literal true.
-  expressions::PredicatePtr filter_predicate_;
+  const expressions::PredicatePtr filter_predicate_;
 
   DISALLOW_COPY_AND_ASSIGN(Selection);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableGenerator.hpp b/query_optimizer/physical/TableGenerator.hpp
index c9ff8a8..4d6419f 100644
--- a/query_optimizer/physical/TableGenerator.hpp
+++ b/query_optimizer/physical/TableGenerator.hpp
@@ -29,6 +29,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "utility/Cast.hpp"
@@ -100,6 +101,12 @@ class TableGenerator : public Physical {
     return {};
   }
 
+  PhysicalPtr copyWithNewOutputPartitionSchemeHeader(
+      PartitionSchemeHeader *partition_scheme_header) const override {
+    return TableGeneratorPtr(
+        new TableGenerator(generator_function_handle_, table_alias_, attribute_list_, partition_scheme_header));
+  }
+
   void getFieldStringItems(
       std::vector<std::string> *inline_field_names,
       std::vector<std::string> *inline_field_values,
@@ -116,6 +123,11 @@ class TableGenerator : public Physical {
       inline_field_values->push_back(table_alias_);
     }
 
+    if (partition_scheme_header_) {
+      inline_field_names->push_back("output_partition_scheme_header");
+      inline_field_values->push_back(partition_scheme_header_->toString());
+    }
+
     container_child_field_names->push_back("");
     container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(attribute_list_));
   }
@@ -139,8 +151,10 @@ class TableGenerator : public Physical {
  private:
   TableGenerator(const GeneratorFunctionHandlePtr &generator_function_handle,
                  const std::string &table_alias,
-                 const std::vector<E::AttributeReferencePtr> &attribute_list)
-      : generator_function_handle_(generator_function_handle),
+                 const std::vector<E::AttributeReferencePtr> &attribute_list,
+                 PartitionSchemeHeader *partition_scheme_header = nullptr)
+      : Physical(partition_scheme_header),
+        generator_function_handle_(generator_function_handle),
         table_alias_(table_alias),
         attribute_list_(attribute_list) {
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableReference.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableReference.cpp b/query_optimizer/physical/TableReference.cpp
index bfd6464..cd6bba4 100644
--- a/query_optimizer/physical/TableReference.cpp
+++ b/query_optimizer/physical/TableReference.cpp
@@ -19,11 +19,17 @@
 
 #include "query_optimizer/physical/TableReference.hpp"
 
+#include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
+#include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
+#include "query_optimizer/physical/Physical.hpp"
 #include "utility/Cast.hpp"
 
 namespace quickstep {
@@ -32,6 +38,50 @@ namespace physical {
 
 namespace E = ::quickstep::optimizer::expressions;
 
+TableReferencePtr TableReference::Create(
+    const CatalogRelation *relation,
+    const std::string &alias,
+    const std::vector<expressions::AttributeReferencePtr> &attribute_list) {
+  std::unique_ptr<PartitionSchemeHeader> output_partition_scheme_header;
+  const quickstep::PartitionScheme *partition_scheme = relation->getPartitionScheme();
+  if (partition_scheme) {
+    const quickstep::PartitionSchemeHeader &partition_scheme_header = partition_scheme->getPartitionSchemeHeader();
+
+    PartitionSchemeHeader::PartitionType physical_partition_type;
+    switch (partition_scheme_header.getPartitionType()) {
+      case quickstep::PartitionSchemeHeader::PartitionType::kHash:
+        physical_partition_type = PartitionSchemeHeader::PartitionType::kHash;
+        break;
+      case quickstep::PartitionSchemeHeader::PartitionType::kRange:
+        physical_partition_type = PartitionSchemeHeader::PartitionType::kRange;
+        break;
+      default:
+        return nullptr;
+    }
+
+    PartitionSchemeHeader::PartitionExprIds partition_expr_ids;
+    for (const attribute_id part_attr : partition_scheme_header.getPartitionAttributeIds()) {
+      partition_expr_ids.push_back({ attribute_list[part_attr]->id() });
+    }
+
+    output_partition_scheme_header =
+        std::make_unique<PartitionSchemeHeader>(physical_partition_type,
+                                                partition_scheme_header.getNumPartitions(),
+                                                std::move(partition_expr_ids));
+  }
+
+  return TableReferencePtr(new TableReference(relation, alias, attribute_list,
+                                              output_partition_scheme_header.release()));
+}
+
+PhysicalPtr TableReference::copyWithNewChildren(
+    const std::vector<PhysicalPtr> &new_children) const {
+  DCHECK_EQ(new_children.size(), children().size());
+  std::unique_ptr<PartitionSchemeHeader> output_partition_scheme_header;
+  return TableReferencePtr(new TableReference(relation_, alias_, attribute_list_,
+                                              cloneOutputPartitionSchemeHeader()));
+}
+
 void TableReference::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
@@ -47,6 +97,11 @@ void TableReference::getFieldStringItems(
     inline_field_values->push_back(alias_);
   }
 
+  if (partition_scheme_header_) {
+    inline_field_names->push_back("output_partition_scheme_header");
+    inline_field_values->push_back(partition_scheme_header_->toString());
+  }
+
   container_child_field_names->push_back("");
   container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(attribute_list_));
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/physical/TableReference.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TableReference.hpp b/query_optimizer/physical/TableReference.hpp
index 638d73b..bdc0578 100644
--- a/query_optimizer/physical/TableReference.hpp
+++ b/query_optimizer/physical/TableReference.hpp
@@ -20,7 +20,6 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_TABLE_REFERENCE_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_TABLE_REFERENCE_HPP_
 
-#include <memory>
 #include <string>
 #include <vector>
 
@@ -45,6 +44,7 @@ namespace physical {
  *  @{
  */
 
+struct PartitionSchemeHeader;
 class TableReference;
 typedef std::shared_ptr<const TableReference> TableReferencePtr;
 
@@ -70,10 +70,7 @@ class TableReference : public Physical {
   }
 
   PhysicalPtr copyWithNewChildren(
-      const std::vector<PhysicalPtr> &new_children) const override {
-    DCHECK_EQ(new_children.size(), children().size());
-    return TableReferencePtr(new TableReference(relation_, alias_, attribute_list_));
-  }
+      const std::vector<PhysicalPtr> &new_children) const override;
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
     return attribute_list_;
@@ -101,9 +98,7 @@ class TableReference : public Physical {
   static TableReferencePtr Create(
       const CatalogRelation *relation,
       const std::string &alias,
-      const std::vector<expressions::AttributeReferencePtr> &attribute_list) {
-    return TableReferencePtr(new TableReference(relation, alias, attribute_list));
-  }
+      const std::vector<expressions::AttributeReferencePtr> &attribute_list);
 
  protected:
   void getFieldStringItems(
@@ -117,14 +112,16 @@ class TableReference : public Physical {
  private:
   TableReference(
       const CatalogRelation *relation, const std::string &alias,
-      const std::vector<expressions::AttributeReferencePtr> &attribute_list)
-      : relation_(relation),
+      const std::vector<expressions::AttributeReferencePtr> &attribute_list,
+      PartitionSchemeHeader *partition_scheme_header)
+      : Physical(partition_scheme_header),
+        relation_(relation),
         alias_(alias),
         attribute_list_(attribute_list) {}
 
   const CatalogRelation *relation_;
-  std::string alias_;
-  std::vector<expressions::AttributeReferencePtr> attribute_list_;
+  const std::string alias_;
+  const std::vector<expressions::AttributeReferencePtr> attribute_list_;
 
   DISALLOW_COPY_AND_ASSIGN(TableReference);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 70302be..1ad8f40 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -391,5 +391,5 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_rules_TopDownRule
-                      quickstep_queryoptimizer_rules_UpdateExpression
-                      quickstep_queryoptimizer_rules_UnnestSubqueries)
+                      quickstep_queryoptimizer_rules_UnnestSubqueries
+                      quickstep_queryoptimizer_rules_UpdateExpression)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 70bb185..d3e7e08 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -463,6 +463,8 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
 
 
 void HashInnerJoinWorkOrder::execute() {
+  output_destination_->setInputPartitionId(part_id_);
+
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
@@ -694,6 +696,8 @@ void HashInnerJoinWorkOrder::executeWithCopyElision(ValueAccessor *probe_accesso
 }
 
 void HashSemiJoinWorkOrder::execute() {
+  output_destination_->setInputPartitionId(part_id_);
+
   if (residual_predicate_ == nullptr) {
     executeWithoutResidualPredicate();
   } else {
@@ -1018,6 +1022,8 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
 }
 
 void HashOuterJoinWorkOrder::execute() {
+  output_destination_->setInputPartitionId(part_id_);
+
   const relation_id build_relation_id = build_relation_.getID();
   const relation_id probe_relation_id = probe_relation_.getID();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 6391847..23267f8 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -35,6 +35,7 @@
 #include "relational_operators/WorkOrder.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/HashTable.hpp"
+#include "storage/InsertDestination.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
 #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
@@ -48,7 +49,6 @@ namespace tmb { class MessageBus; }
 namespace quickstep {
 
 class CatalogRelationSchema;
-class InsertDestination;
 class Predicate;
 class Scalar;
 class StorageManager;
@@ -712,6 +712,8 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   ~HashAntiJoinWorkOrder() override {}
 
   void execute() override {
+    output_destination_->setInputPartitionId(part_id_);
+
     if (residual_predicate_ == nullptr) {
       executeWithoutResidualPredicate();
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index 935b104..845c563 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -75,7 +75,7 @@ bool SelectOperator::getAllWorkOrders(
         }
 #endif  // QUICKSTEP_HAVE_LIBNUMA
         container->addNormalWorkOrder(
-            new SelectWorkOrder(query_id_, input_relation_, input_block_id, predicate, simple_projection_,
+            new SelectWorkOrder(query_id_, input_relation_, part_id, input_block_id, predicate, simple_projection_,
                                 simple_selection_, selection, output_destination, storage_manager,
                                 CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node),
             op_index_);
@@ -95,7 +95,7 @@ bool SelectOperator::getAllWorkOrders(
         }
 #endif  // QUICKSTEP_HAVE_LIBNUMA
         container->addNormalWorkOrder(
-            new SelectWorkOrder(query_id_, input_relation_, block, predicate, simple_projection_,
+            new SelectWorkOrder(query_id_, input_relation_, part_id, block, predicate, simple_projection_,
                                 simple_selection_, selection, output_destination, storage_manager,
                                 CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context), numa_node),
             op_index_);
@@ -114,7 +114,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
 
     for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
       for (const block_id input_block_id : input_relation_block_ids_[part_id]) {
-        container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+        container->addWorkOrderProto(createWorkOrderProto(part_id, input_block_id), op_index_);
       }
     }
     started_ = true;
@@ -123,7 +123,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
     for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
       while (num_workorders_generated_[part_id] < input_relation_block_ids_[part_id].size()) {
         container->addWorkOrderProto(
-            createWorkOrderProto(input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
+            createWorkOrderProto(part_id, input_relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
             op_index_);
         ++num_workorders_generated_[part_id];
       }
@@ -132,7 +132,7 @@ bool SelectOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container)
   }
 }
 
-serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* SelectOperator::createWorkOrderProto(const partition_id part_id, const block_id block) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::SELECT);
   proto->set_query_id(query_id_);
@@ -140,6 +140,7 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
   proto->SetExtension(serialization::SelectWorkOrder::relation_id, input_relation_.getID());
   proto->SetExtension(serialization::SelectWorkOrder::insert_destination_index, output_destination_index_);
   proto->SetExtension(serialization::SelectWorkOrder::predicate_index, predicate_index_);
+  proto->SetExtension(serialization::SelectWorkOrder::partition_id, part_id);
   proto->SetExtension(serialization::SelectWorkOrder::block_id, block);
   proto->SetExtension(serialization::SelectWorkOrder::simple_projection, simple_projection_);
   if (simple_projection_) {
@@ -158,6 +159,8 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
 }
 
 void SelectWorkOrder::execute() {
+  output_destination_->setInputPartitionId(part_id_);
+
   BlockReference block(
       storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index b8161b7..c11d681 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -226,9 +226,10 @@ class SelectOperator : public RelationalOperator {
   /**
    * @brief Create Work Order proto.
    *
+   * @param part_id The partition id.
    * @param block The block id used in the Work Order.
    **/
-  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+  serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
 
   const CatalogRelation &input_relation_;
   const CatalogRelation &output_relation_;
@@ -268,6 +269,7 @@ class SelectWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param input_relation The relation to perform selection over.
+   * @param part_id The partition id.
    * @param input_block_id The block id.
    * @param predicate All tuples matching \c predicate will be selected (or NULL
    *        to select all tuples).
@@ -283,6 +285,7 @@ class SelectWorkOrder : public WorkOrder {
    **/
   SelectWorkOrder(const std::size_t query_id,
                   const CatalogRelationSchema &input_relation,
+                  const partition_id part_id,
                   const block_id input_block_id,
                   const Predicate *predicate,
                   const bool simple_projection,
@@ -294,6 +297,7 @@ class SelectWorkOrder : public WorkOrder {
                   const numa_node_id numa_node = 0)
       : WorkOrder(query_id),
         input_relation_(input_relation),
+        part_id_(part_id),
         input_block_id_(input_block_id),
         predicate_(predicate),
         simple_projection_(simple_projection),
@@ -313,6 +317,7 @@ class SelectWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param input_relation The relation to perform selection over.
+   * @param part_id The partition id.
    * @param input_block_id The block id.
    * @param predicate All tuples matching \c predicate will be selected (or NULL
    *        to select all tuples).
@@ -328,6 +333,7 @@ class SelectWorkOrder : public WorkOrder {
    **/
   SelectWorkOrder(const std::size_t query_id,
                   const CatalogRelationSchema &input_relation,
+                  const partition_id part_id,
                   const block_id input_block_id,
                   const Predicate *predicate,
                   const bool simple_projection,
@@ -339,6 +345,7 @@ class SelectWorkOrder : public WorkOrder {
                   const numa_node_id numa_node = 0)
       : WorkOrder(query_id),
         input_relation_(input_relation),
+        part_id_(part_id),
         input_block_id_(input_block_id),
         predicate_(predicate),
         simple_projection_(simple_projection),
@@ -364,6 +371,7 @@ class SelectWorkOrder : public WorkOrder {
 
  private:
   const CatalogRelationSchema &input_relation_;
+  const partition_id part_id_;
   const block_id input_block_id_;
   const Predicate *predicate_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 18f0589..42a0e7d 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -238,12 +238,14 @@ message SaveBlocksWorkOrder {
   }
 }
 
+// Next tag: 250.
 message SelectWorkOrder {
   extend WorkOrder {
     // All required.
     optional int32 relation_id = 240;
     optional int32 insert_destination_index = 241;
     optional int32 predicate_index = 242;
+    optional uint64 partition_id = 249;
     optional fixed64 block_id = 243;
     optional bool simple_projection = 244;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 48bf956..2b0bae4 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -434,7 +434,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::SELECT: {
-      LOG(INFO) << "Creating SelectWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+      const partition_id part_id =
+          proto.GetExtension(serialization::SelectWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating SelectWorkOrder (Partition " << part_id << ") for Query " << query_id
+                << " in Shiftboss " << shiftboss_index;
+
       const bool simple_projection =
           proto.GetExtension(serialization::SelectWorkOrder::simple_projection);
       vector<attribute_id> simple_selection;
@@ -447,6 +452,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SelectWorkOrder::relation_id)),
+          part_id,
           proto.GetExtension(serialization::SelectWorkOrder::block_id),
           query_context->getPredicate(
               proto.GetExtension(serialization::SelectWorkOrder::predicate_index)),
@@ -913,6 +919,7 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              proto.HasExtension(serialization::SelectWorkOrder::predicate_index) &&
              query_context.isValidPredicate(
                  proto.GetExtension(serialization::SelectWorkOrder::predicate_index)) &&
+             proto.HasExtension(serialization::SelectWorkOrder::partition_id) &&
              proto.HasExtension(serialization::SelectWorkOrder::block_id);
     }
     case serialization::SORT_MERGE_RUN: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 4296ba0..f33a4f4 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -776,6 +776,7 @@ target_link_libraries(quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlockLayout
                       quickstep_storage_StorageManager
                       quickstep_storage_TupleIdSequence
+                      quickstep_storage_ValueAccessor
                       quickstep_storage_ValueAccessorUtil
                       quickstep_threading_SpinMutex
                       quickstep_threading_ThreadIDBasedMap

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index aeef08a..3caa80f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -40,6 +40,7 @@
 #include "storage/StorageBlockLayout.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/TupleIdSequence.hpp"
+#include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "threading/SpinMutex.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
@@ -573,11 +574,7 @@ PartitionSchemeHeader::PartitionAttributeIds PartitionAwareInsertDestination::ge
 }
 
 void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) {
-  PartitionSchemeHeader::PartitionValues values;
-  for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
-    values.push_back(tuple.getAttributeValue(attr_id));
-  }
-  const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+  const partition_id part_id = getPartitionId(tuple);
 
   MutableBlockReference output_block = getBlockForInsertionInPartition(part_id);
 
@@ -596,11 +593,7 @@ void PartitionAwareInsertDestination::insertTuple(const Tuple &tuple) {
 }
 
 void PartitionAwareInsertDestination::insertTupleInBatch(const Tuple &tuple) {
-  PartitionSchemeHeader::PartitionValues values;
-  for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
-    values.push_back(tuple.getAttributeValue(attr_id));
-  }
-  const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+  const partition_id part_id = getPartitionId(tuple);
 
   MutableBlockReference output_block = getBlockForInsertionInPartition(part_id);
 
@@ -637,12 +630,7 @@ void PartitionAwareInsertDestination::bulkInsertTuples(ValueAccessor *accessor,
     // set a bit in the appropriate TupleIdSequence.
     accessor->beginIteration();
     while (accessor->next()) {
-      PartitionSchemeHeader::PartitionValues values;
-      for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
-        values.push_back(accessor->getTypedValue(attr_id));
-      }
-      partition_membership[partition_scheme_header_->getPartitionId(values)]
-          ->set(accessor->getCurrentPosition(), true);
+      partition_membership[this->getPartitionId(accessor)]->set(accessor->getCurrentPosition(), true);
     }
 
     // For each partition, create an adapter around Value Accessor and
@@ -693,12 +681,7 @@ void PartitionAwareInsertDestination::bulkInsertTuplesWithRemappedAttributes(
     // set a bit in the appropriate TupleIdSequence.
     accessor->beginIteration();
     while (accessor->next()) {
-      PartitionSchemeHeader::PartitionValues values;
-      for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
-        values.push_back(accessor->getTypedValue(attr_id));
-      }
-      partition_membership[partition_scheme_header_->getPartitionId(values)]
-          ->set(accessor->getCurrentPosition(), true);
+      partition_membership[this->getPartitionId(accessor)]->set(accessor->getCurrentPosition(), true);
     }
 
     // For each partition, create an adapter around Value Accessor and
@@ -735,12 +718,7 @@ void PartitionAwareInsertDestination::insertTuplesFromVector(std::vector<Tuple>:
   }
 
   for (; begin != end; ++begin) {
-    PartitionSchemeHeader::PartitionValues values;
-    for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
-      values.push_back(begin->getAttributeValue(attr_id));
-    }
-
-    const partition_id part_id = partition_scheme_header_->getPartitionId(values);
+    const partition_id part_id = getPartitionId(*begin);
 
     MutableBlockReference dest_block = getBlockForInsertionInPartition(part_id);
     // FIXME(chasseur): Deal with TupleTooLargeForBlock exception.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index db58116..878ebb4 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -36,6 +36,7 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageBlockLayout.hpp"
+#include "storage/ValueAccessor.hpp"
 #include "threading/SpinMutex.hpp"
 #include "threading/ThreadIDBasedMap.hpp"
 #include "types/containers/Tuple.hpp"
@@ -53,7 +54,6 @@ namespace tmb { class MessageBus; }
 namespace quickstep {
 
 class StorageManager;
-class ValueAccessor;
 
 namespace merge_run_operator {
 class RunCreator;
@@ -201,6 +201,14 @@ class InsertDestination : public InsertDestinationInterface {
   virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
                                         std::vector<partition_id> *part_ids) = 0;
 
+  /**
+   * @brief Set the input partition id. Used when the partition attributes are
+   *        empty.
+   *
+   * @param input_partition_id The input partition id.
+   **/
+  virtual void setInputPartitionId(const partition_id input_partition_id) {}
+
  protected:
   /**
    * @brief Get a block to use for insertion.
@@ -541,6 +549,10 @@ class PartitionAwareInsertDestination : public InsertDestination {
   void insertTuplesFromVector(std::vector<Tuple>::const_iterator begin,
                               std::vector<Tuple>::const_iterator end) override;
 
+  void setInputPartitionId(const partition_id input_partition_id) override {
+    input_partition_id_ = input_partition_id;
+  }
+
  protected:
   MutableBlockReference getBlockForInsertion() override {
     LOG(FATAL) << "PartitionAwareInsertDestination::getBlockForInsertion needs a partition id as an argument.";
@@ -599,6 +611,24 @@ class PartitionAwareInsertDestination : public InsertDestination {
     available_block_refs_[part_id].clear();
   }
 
+  partition_id getPartitionId(const Tuple &tuple) const {
+    PartitionSchemeHeader::PartitionValues values;
+    for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+      values.push_back(tuple.getAttributeValue(attr_id));
+    }
+
+    return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values);
+  }
+
+  partition_id getPartitionId(ValueAccessor *accessor) const {
+    PartitionSchemeHeader::PartitionValues values;
+    for (const attribute_id attr_id : partition_scheme_header_->getPartitionAttributeIds()) {
+      values.push_back(accessor->getTypedValueVirtual(attr_id));
+    }
+
+    return values.empty() ? input_partition_id_ : partition_scheme_header_->getPartitionId(values);
+  }
+
   std::unique_ptr<const PartitionSchemeHeader> partition_scheme_header_;
 
   // A vector of available block references for each partition.
@@ -612,6 +642,8 @@ class PartitionAwareInsertDestination : public InsertDestination {
   // Mutex for locking each partition separately.
   SpinMutex *mutexes_for_partition_;
 
+  partition_id input_partition_id_ = 0u;
+
   DISALLOW_COPY_AND_ASSIGN(PartitionAwareInsertDestination);
 };
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index e1fb770..16a83ee 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -284,6 +284,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer
                       quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_physical_PartitionSchemeHeader
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_queryoptimizer_physical_TableReference

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5fbfd211/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index f8bf6f8..98de011 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -34,6 +34,7 @@
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PartitionSchemeHeader.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
@@ -189,6 +190,11 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     }
   }
 
+  const P::PartitionSchemeHeader *partition_scheme_header = input->getOutputPartitionSchemeHeader();
+  if (partition_scheme_header) {
+    node_info.labels.emplace_back(partition_scheme_header->toString());
+  }
+
   if (lip_filter_conf_ != nullptr) {
     const auto &build_filters = lip_filter_conf_->getBuildInfoMap();
     const auto build_it = build_filters.find(input);