You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/02/05 22:04:10 UTC

[72/72] incubator-quickstep git commit: Added limited optimizer support for Partitioned Hash Joins.

Added limited optimizer support for Partitioned Hash Joins.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ce6a20b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ce6a20b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ce6a20b

Branch: refs/heads/reorder-partitioned-hash-join
Commit: 5ce6a20b7bcc2c0e363cc85bf9ec8016dca2eaf9
Parents: 27a8055
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Jan 25 01:49:28 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Sun Feb 5 14:02:37 2017 -0800

----------------------------------------------------------------------
 query_execution/QueryContext.cpp                |   2 +
 query_optimizer/ExecutionGenerator.cpp          | 372 ++++++++++++++++---
 query_optimizer/ExecutionGenerator.hpp          |   7 +-
 .../tests/execution_generator/Partition.test    |  71 +++-
 types/TypedValue.hpp                            |  19 +
 5 files changed, 414 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_execution/QueryContext.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 71839a7..3681a3b 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -54,6 +54,8 @@ using std::vector;
 
 namespace quickstep {
 
+constexpr QueryContext::insert_destination_id QueryContext::kInvalidInsertDestinationId;
+
 QueryContext::QueryContext(const serialization::QueryContext &proto,
                            const CatalogDatabaseLite &database,
                            StorageManager *storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 6918313..828d21f 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -134,8 +134,12 @@
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+using std::find;
+using std::make_unique;
 using std::move;
+using std::size_t;
 using std::static_pointer_cast;
+using std::swap;
 using std::unique_ptr;
 using std::unordered_map;
 using std::vector;
@@ -159,6 +163,8 @@ static const volatile bool aggregate_hashtable_type_dummy
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
+DEFINE_uint64(num_repartitions, 4, "Number of repartitions for a hash join.");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
@@ -421,7 +427,8 @@ void ExecutionGenerator::convertTableReference(
       std::piecewise_construct,
       std::forward_as_tuple(physical_table_reference),
       std::forward_as_tuple(CatalogRelationInfo::kInvalidOperatorIndex,
-                            catalog_relation));
+                            catalog_relation,
+                            QueryContext::kInvalidInsertDestinationId));
 }
 
 void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
@@ -461,8 +468,9 @@ void ExecutionGenerator::convertSample(const P::SamplePtr &physical_sample) {
       std::piecewise_construct,
       std::forward_as_tuple(physical_sample),
       std::forward_as_tuple(sample_index,
-                            output_relation));
-  temporary_relation_info_vec_.emplace_back(sample_index, output_relation);
+                            output_relation,
+                            insert_destination_index));
+  temporary_relation_info_vec_.emplace_back(sample_index, output_relation, insert_destination_index);
 }
 
 bool ExecutionGenerator::convertSimpleProjection(
@@ -595,8 +603,9 @@ void ExecutionGenerator::convertSelection(
       std::piecewise_construct,
       std::forward_as_tuple(physical_selection),
       std::forward_as_tuple(select_index,
-                            output_relation));
-  temporary_relation_info_vec_.emplace_back(select_index, output_relation);
+                            output_relation,
+                            insert_destination_index));
+  temporary_relation_info_vec_.emplace_back(select_index, output_relation, insert_destination_index);
 
   if (lip_filter_generator_ != nullptr) {
     lip_filter_generator_->addSelectionInfo(physical_selection, select_index);
@@ -679,6 +688,64 @@ void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan
                                            build_filter_operator_index);
 }
 
+namespace {
+
+bool areSamePartitionSchemeHeaders(const PartitionSchemeHeader &lhs_partition_header,
+                                   const CatalogRelationSchema &lhs_scheme,
+                                   const PartitionSchemeHeader &rhs_partition_header,
+                                   const CatalogRelationSchema &rhs_scheme) {
+  if (lhs_partition_header.getPartitionType() != rhs_partition_header.getPartitionType()) {
+    return false;
+  }
+
+  if (lhs_partition_header.getNumPartitions() != rhs_partition_header.getNumPartitions()) {
+    return false;
+  }
+
+  // Check whether the underlying types in CatalogAttribute are the same.
+  if (!lhs_scheme.getAttributeById(lhs_partition_header.getPartitionAttributeId())->getType().equals(
+       rhs_scheme.getAttributeById(rhs_partition_header.getPartitionAttributeId())->getType())) {
+    return false;
+  }
+
+  switch (lhs_partition_header.getPartitionType()) {
+    case PartitionSchemeHeader::PartitionType::kHash:
+      return true;
+    case PartitionSchemeHeader::PartitionType::kRange: {
+      const vector<TypedValue> &lhs_ranges =
+          static_cast<const RangePartitionSchemeHeader&>(lhs_partition_header).getPartitionRangeBoundaries();
+      const vector<TypedValue> &rhs_ranges =
+          static_cast<const RangePartitionSchemeHeader&>(rhs_partition_header).getPartitionRangeBoundaries();
+
+      return lhs_ranges == rhs_ranges;
+    }
+  }
+
+  return false;
+}
+
+
+// Note that this method will be deprecated once the partition scheme header
+// supports multiple partition attributes.
+size_t chooseBestRepartitionAttributeIndex(const CatalogRelationStatistics &stats,
+                                           const vector<attribute_id> &join_attributes) {
+  size_t chose_attr_index = static_cast<size_t>(-1);
+  size_t chose_attr_num_distinct_values = 0;
+
+  for (std::size_t i = 0; i < join_attributes.size(); ++i) {
+    const attribute_id attr = join_attributes[i];
+    if (stats.hasNumDistinctValues(attr) &&
+        stats.getNumDistinctValues(attr) > chose_attr_num_distinct_values) {
+      chose_attr_index = i;
+      chose_attr_num_distinct_values = stats.getNumDistinctValues(attr);
+    }
+  }
+
+  return (chose_attr_index != static_cast<size_t>(-1)) ? chose_attr_index : 0;
+}
+
+}  // namespace
+
 void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   // HashJoin is converted to three operators:
   //     BuildHash, HashJoin, DestroyHash. The second is the primary operator.
@@ -689,13 +756,10 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   std::vector<attribute_id> probe_attribute_ids;
   std::vector<attribute_id> build_attribute_ids;
 
-  std::size_t build_cardinality =
-      cost_model_for_hash_join_->estimateCardinality(build_physical);
-
   bool any_probe_attributes_nullable = false;
   bool any_build_attributes_nullable = false;
 
-  const std::vector<E::AttributeReferencePtr> &left_join_attributes =
+  std::vector<E::AttributeReferencePtr> left_join_attributes =
       physical_plan->left_join_attributes();
   for (const E::AttributeReferencePtr &left_join_attribute : left_join_attributes) {
     const CatalogAttribute *probe_catalog_attribute
@@ -707,7 +771,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     }
   }
 
-  const std::vector<E::AttributeReferencePtr> &right_join_attributes =
+  std::vector<E::AttributeReferencePtr> right_join_attributes =
       physical_plan->right_join_attributes();
   for (const E::AttributeReferencePtr &right_join_attribute : right_join_attributes) {
     const CatalogAttribute *build_catalog_attribute
@@ -733,6 +797,218 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
     key_types.push_back(&left_attribute_type);
   }
 
+  const CatalogRelationInfo *build_relation_info =
+      findRelationInfoOutputByPhysical(build_physical);
+  const CatalogRelationInfo *probe_operator_info =
+      findRelationInfoOutputByPhysical(probe_physical);
+
+  const CatalogRelation *build_relation = build_relation_info->relation;
+  const CatalogRelation *probe_relation = probe_operator_info->relation;
+
+  // FIXME(quickstep-team): Add support for self-join.
+  if (build_relation == probe_relation) {
+    THROW_SQL_ERROR() << "Self-join is not supported";
+  }
+
+  const PartitionScheme *build_partition_scheme = build_relation->getPartitionScheme();
+  const PartitionScheme *probe_partition_scheme = probe_relation->getPartitionScheme();
+
+  bool build_needs_repartition = false;
+  bool probe_needs_repartition = false;
+  bool needs_swap = false;
+  if (build_partition_scheme && probe_partition_scheme) {
+    const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader();
+    const PartitionSchemeHeader &probe_partition_scheme_header = probe_partition_scheme->getPartitionSchemeHeader();
+
+    switch (build_partition_scheme_header.getPartitionType()) {
+      case PartitionSchemeHeader::PartitionType::kRange:
+        build_needs_repartition = true;
+
+        switch (probe_partition_scheme_header.getPartitionType()) {
+          case PartitionSchemeHeader::PartitionType::kRange:
+            probe_needs_repartition = true;
+            break;
+          case PartitionSchemeHeader::PartitionType::kHash: {
+            const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId();
+            if (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) !=
+                    probe_attribute_ids.end()) {
+              needs_swap = true;
+            } else {
+              probe_needs_repartition = true;
+            }
+            break;
+          }
+        }
+        break;
+      case PartitionSchemeHeader::PartitionType::kHash: {
+        const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId();
+        if (find(build_attribute_ids.begin(), build_attribute_ids.end(), build_partition_attr) !=
+                build_attribute_ids.end()) {
+          // BuildRelation has an useful partition.
+          switch (probe_partition_scheme_header.getPartitionType()) {
+            case PartitionSchemeHeader::PartitionType::kRange:
+              probe_needs_repartition = true;
+              break;
+            case PartitionSchemeHeader::PartitionType::kHash: {
+              if (areSamePartitionSchemeHeaders(build_partition_scheme_header, *build_relation,
+                                                probe_partition_scheme_header, *probe_relation)) {
+                if (cost_model_for_hash_join_->estimateCardinality(build_physical) >
+                        cost_model_for_hash_join_->estimateCardinality(probe_physical)) {
+                  needs_swap = true;
+                }
+              } else {
+                probe_needs_repartition = true;
+              }
+              break;
+            }
+          }
+        } else {
+          build_needs_repartition = true;
+
+          switch (probe_partition_scheme_header.getPartitionType()) {
+            case PartitionSchemeHeader::PartitionType::kRange:
+              probe_needs_repartition = true;
+              break;
+            case PartitionSchemeHeader::PartitionType::kHash: {
+              const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId();
+              if (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) !=
+                      probe_attribute_ids.end()) {
+                needs_swap = true;
+              } else {
+                probe_needs_repartition = true;
+              }
+              break;
+            }
+          }
+        }
+        break;
+      }
+    }
+  } else if (probe_partition_scheme) {
+    needs_swap = true;
+
+    const PartitionSchemeHeader &probe_partition_scheme_header = probe_partition_scheme->getPartitionSchemeHeader();
+    switch (probe_partition_scheme_header.getPartitionType()) {
+      case PartitionSchemeHeader::PartitionType::kRange:
+        probe_needs_repartition = true;
+        break;
+      case PartitionSchemeHeader::PartitionType::kHash: {
+        const attribute_id probe_partition_attr = probe_partition_scheme_header.getPartitionAttributeId();
+
+        probe_needs_repartition =
+            (find(probe_attribute_ids.begin(), probe_attribute_ids.end(), probe_partition_attr) ==
+                probe_attribute_ids.end());
+        break;
+      }
+    }
+  } else if (build_partition_scheme) {
+    const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader();
+    switch (build_partition_scheme_header.getPartitionType()) {
+      case PartitionSchemeHeader::PartitionType::kRange:
+        build_needs_repartition = true;
+        break;
+      case PartitionSchemeHeader::PartitionType::kHash: {
+        const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId();
+        build_needs_repartition =
+            (find(build_attribute_ids.begin(), build_attribute_ids.end(), build_partition_attr) ==
+                build_attribute_ids.end());
+        break;
+      }
+    }
+  }
+
+  if (needs_swap) {
+    swap(probe_physical, build_physical);
+    swap(probe_attribute_ids, build_attribute_ids);
+    swap(any_probe_attributes_nullable, any_build_attributes_nullable);
+    swap(left_join_attributes, right_join_attributes);
+    swap(probe_operator_info, build_relation_info);
+    swap(probe_relation, build_relation);
+    swap(probe_partition_scheme, build_partition_scheme);
+    swap(probe_needs_repartition, build_needs_repartition);
+  }
+
+  if ((build_needs_repartition && build_relation_info->isStoredRelation()) ||
+      (probe_needs_repartition && probe_operator_info->isStoredRelation())) {
+    THROW_SQL_ERROR() << "Re-partition for the base table is not supported";
+  }
+
+  if (!build_needs_repartition && probe_needs_repartition) {
+    const PartitionSchemeHeader &build_partition_scheme_header = build_partition_scheme->getPartitionSchemeHeader();
+    const attribute_id build_partition_attr = build_partition_scheme_header.getPartitionAttributeId();
+
+    size_t repartition_attr_index = 0;
+    while (build_attribute_ids[repartition_attr_index] != build_partition_attr) {
+      ++repartition_attr_index;
+    }
+    auto probe_repartition_scheme_header =
+        make_unique<HashPartitionSchemeHeader>(build_partition_scheme_header.getNumPartitions(),
+                                               probe_attribute_ids[repartition_attr_index]);
+    auto probe_repartition_scheme = make_unique<PartitionScheme>(probe_repartition_scheme_header.release());
+
+    CatalogRelation *mutable_probe_relation =
+        catalog_database_->getRelationByIdMutable(probe_relation->getID());
+    mutable_probe_relation->setPartitionScheme(probe_repartition_scheme.release());
+
+    probe_partition_scheme = probe_relation->getPartitionScheme();
+    DCHECK_EQ(PartitionSchemeHeader::PartitionType::kHash,
+           probe_partition_scheme->getPartitionSchemeHeader().getPartitionType());
+
+    S::InsertDestination *probe_insert_destination_proto =
+        query_context_proto_->mutable_insert_destinations(probe_operator_info->output_destination_index);
+    probe_insert_destination_proto->Clear();
+
+    probe_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+    probe_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+        ->MergeFrom(probe_partition_scheme->getProto());
+  } else if (build_needs_repartition) {
+    const size_t repartition_attr_index =
+        chooseBestRepartitionAttributeIndex(build_relation->getStatistics(), build_attribute_ids);
+    auto build_repartition_scheme_header =
+        make_unique<HashPartitionSchemeHeader>(FLAGS_num_repartitions,
+                                               build_attribute_ids[repartition_attr_index]);
+    auto build_repartition_scheme = make_unique<PartitionScheme>(build_repartition_scheme_header.release());
+
+    CatalogRelation *mutable_build_relation =
+        catalog_database_->getRelationByIdMutable(build_relation->getID());
+    mutable_build_relation->setPartitionScheme(build_repartition_scheme.release());
+
+    build_partition_scheme = build_relation->getPartitionScheme();
+    DCHECK_EQ(PartitionSchemeHeader::PartitionType::kHash,
+           build_partition_scheme->getPartitionSchemeHeader().getPartitionType());
+
+    S::InsertDestination *build_insert_destination_proto =
+        query_context_proto_->mutable_insert_destinations(build_relation_info->output_destination_index);
+    build_insert_destination_proto->Clear();
+
+    build_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+    build_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+        ->MergeFrom(build_partition_scheme->getProto());
+
+    if (probe_needs_repartition) {
+      auto probe_repartition_scheme_header =
+          make_unique<HashPartitionSchemeHeader>(FLAGS_num_repartitions,
+                                                 probe_attribute_ids[repartition_attr_index]);
+      auto probe_repartition_scheme = make_unique<PartitionScheme>(probe_repartition_scheme_header.release());
+
+      CatalogRelation *mutable_probe_relation =
+          catalog_database_->getRelationByIdMutable(probe_relation->getID());
+      mutable_probe_relation->setPartitionScheme(probe_repartition_scheme.release());
+
+      probe_partition_scheme = probe_relation->getPartitionScheme();
+      DCHECK_EQ(PartitionSchemeHeader::PartitionType::kHash,
+             probe_partition_scheme->getPartitionSchemeHeader().getPartitionType());
+
+      S::InsertDestination *probe_insert_destination_proto =
+          query_context_proto_->mutable_insert_destinations(probe_operator_info->output_destination_index);
+      probe_insert_destination_proto->Clear();
+
+      probe_insert_destination_proto->set_insert_destination_type(S::InsertDestinationType::PARTITION_AWARE);
+      probe_insert_destination_proto->MutableExtension(S::PartitionAwareInsertDestination::partition_scheme)
+          ->MergeFrom(probe_partition_scheme->getProto());
+    }
+  }
+
   // Convert the residual predicate proto.
   QueryContext::predicate_id residual_predicate_index = QueryContext::kInvalidPredicateId;
   if (physical_plan->residual_predicate()) {
@@ -748,11 +1024,6 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   convertNamedExpressions(physical_plan->project_expressions(),
                           query_context_proto_->add_scalar_groups());
 
-  const CatalogRelationInfo *build_relation_info =
-      findRelationInfoOutputByPhysical(build_physical);
-  const CatalogRelationInfo *probe_operator_info =
-      findRelationInfoOutputByPhysical(probe_physical);
-
   // Create a vector that indicates whether each project expression is using
   // attributes from the build relation as input. This information is required
   // by the current implementation of hash left outer join
@@ -765,30 +1036,17 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
                 build_physical->getOutputAttributes())));
   }
 
-  const CatalogRelation *build_relation = build_relation_info->relation;
-
-  // FIXME(quickstep-team): Add support for self-join.
-  if (build_relation == probe_operator_info->relation) {
-    THROW_SQL_ERROR() << "Self-join is not supported";
-  }
-
   // Create join hash table proto.
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto_->join_hash_tables_size();
   S::QueryContext::HashTableContext *hash_table_context_proto =
       query_context_proto_->add_join_hash_tables();
 
-  // No partition.
-  std::size_t num_partitions = 1;
-  if (build_relation->hasPartitionScheme() &&
-      build_attribute_ids.size() == 1) {
-    const PartitionSchemeHeader &partition_scheme_header =
-        build_relation->getPartitionScheme()->getPartitionSchemeHeader();
-    if (build_attribute_ids[0] == partition_scheme_header.getPartitionAttributeId()) {
-      // TODO(zuyu): add optimizer support for partitioned hash joins.
-      hash_table_context_proto->set_num_partitions(num_partitions);
-    }
-  }
+  const std::size_t build_num_partitions =
+      build_partition_scheme
+          ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
+          : 1u;
+  hash_table_context_proto->set_num_partitions(build_num_partitions);
 
   S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
 
@@ -805,6 +1063,8 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
         build_relation->getAttributeById(build_attribute)->getType().getProto());
   }
 
+  const std::size_t build_cardinality =
+      cost_model_for_hash_join_->estimateCardinality(build_physical);
   hash_table_proto->set_estimated_num_entries(build_cardinality);
 
   // Create three operators.
@@ -816,7 +1076,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
               build_relation_info->isStoredRelation(),
               build_attribute_ids,
               any_build_attributes_nullable,
-              num_partitions,
+              build_num_partitions,
               join_hash_table_index));
 
   // Create InsertDestination proto.
@@ -855,11 +1115,11 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
           new HashJoinOperator(
               query_handle_->query_id(),
               *build_relation,
-              *probe_operator_info->relation,
+              *probe_relation,
               probe_operator_info->isStoredRelation(),
               probe_attribute_ids,
               any_probe_attributes_nullable,
-              num_partitions,
+              build_num_partitions,
               *output_relation,
               insert_destination_index,
               join_hash_table_index,
@@ -871,7 +1131,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
 
   const QueryPlan::DAGNodeIndex destroy_operator_index =
       execution_plan_->addRelationalOperator(new DestroyHashOperator(
-          query_handle_->query_id(), num_partitions, join_hash_table_index));
+          query_handle_->query_id(), build_num_partitions, join_hash_table_index));
 
   if (!build_relation_info->isStoredRelation()) {
     execution_plan_->addDirectDependency(build_operator_index,
@@ -902,8 +1162,9 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
       std::piecewise_construct,
       std::forward_as_tuple(physical_plan),
       std::forward_as_tuple(join_operator_index,
-                            output_relation));
-  temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
+                            output_relation,
+                            insert_destination_index));
+  temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation, insert_destination_index);
 
   if (lip_filter_generator_ != nullptr) {
     lip_filter_generator_->addHashJoinInfo(physical_plan,
@@ -979,8 +1240,9 @@ void ExecutionGenerator::convertNestedLoopsJoin(
       std::piecewise_construct,
       std::forward_as_tuple(physical_plan),
       std::forward_as_tuple(join_operator_index,
-                            output_relation));
-  temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation);
+                            output_relation,
+                            insert_destination_index));
+  temporary_relation_info_vec_.emplace_back(join_operator_index, output_relation, insert_destination_index);
 }
 
 void ExecutionGenerator::convertCopyFrom(
@@ -1578,9 +1840,12 @@ void ExecutionGenerator::convertAggregate(
   physical_to_output_relation_map_.emplace(
       std::piecewise_construct,
       std::forward_as_tuple(physical_plan),
-      std::forward_as_tuple(finalize_aggregation_operator_index, output_relation));
+      std::forward_as_tuple(finalize_aggregation_operator_index,
+                            output_relation,
+                            insert_destination_index));
   temporary_relation_info_vec_.emplace_back(finalize_aggregation_operator_index,
-                                            output_relation);
+                                            output_relation,
+                                            insert_destination_index);
 
   const QueryPlan::DAGNodeIndex destroy_aggregation_state_operator_index =
       execution_plan_->addRelationalOperator(
@@ -1641,7 +1906,8 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
                                          false /* is_pipeline_breaker */);
   }
   temporary_relation_info_vec_.emplace_back(run_generator_index,
-                                            initial_runs_relation);
+                                            initial_runs_relation,
+                                            initial_runs_destination_id);
   initial_runs_destination_proto->set_relational_op_index(run_generator_index);
 
   // Create sort configuration for run merging.
@@ -1716,12 +1982,14 @@ void ExecutionGenerator::convertSort(const P::SortPtr &physical_sort) {
       true /* is_pipeline_breaker */);
 
   temporary_relation_info_vec_.emplace_back(merge_run_operator_index,
-                                            sorted_relation);
+                                            sorted_relation,
+                                            sorted_output_destination_id);
   physical_to_output_relation_map_.emplace(
       std::piecewise_construct,
       std::forward_as_tuple(physical_sort),
       std::forward_as_tuple(merge_run_operator_index,
-                            sorted_relation));
+                            sorted_relation,
+                            sorted_output_destination_id));
 }
 
 void ExecutionGenerator::convertTableGenerator(
@@ -1756,8 +2024,9 @@ void ExecutionGenerator::convertTableGenerator(
       std::piecewise_construct,
       std::forward_as_tuple(physical_tablegen),
       std::forward_as_tuple(tablegen_index,
-                            output_relation));
-  temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation);
+                            output_relation,
+                            insert_destination_index));
+  temporary_relation_info_vec_.emplace_back(tablegen_index, output_relation, insert_destination_index);
 }
 
 void ExecutionGenerator::convertWindowAggregate(
@@ -1860,9 +2129,12 @@ void ExecutionGenerator::convertWindowAggregate(
   physical_to_output_relation_map_.emplace(
       std::piecewise_construct,
       std::forward_as_tuple(physical_plan),
-      std::forward_as_tuple(window_aggregation_operator_index, output_relation));
+      std::forward_as_tuple(window_aggregation_operator_index,
+                            output_relation,
+                            insert_destination_index));
   temporary_relation_info_vec_.emplace_back(window_aggregation_operator_index,
-                                            output_relation);
+                                            output_relation,
+                                            insert_destination_index);
 }
 
 }  // namespace optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index eba6eee..8941141 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -132,9 +132,11 @@ class ExecutionGenerator {
    */
   struct CatalogRelationInfo {
     CatalogRelationInfo(const QueryPlan::DAGNodeIndex producer_operator_index_in,
-                        const CatalogRelation *relation_in)
+                        const CatalogRelation *relation_in,
+                        const QueryContext::insert_destination_id output_destination_index_in)
         : producer_operator_index(producer_operator_index_in),
-          relation(relation_in) {}
+          relation(relation_in),
+          output_destination_index(output_destination_index_in) {}
 
     /**
      * @return True if the relation is a stored relation (i.e. not a temporary relation
@@ -146,6 +148,7 @@ class ExecutionGenerator {
 
     const QueryPlan::DAGNodeIndex producer_operator_index;
     const CatalogRelation *relation;
+    const QueryContext::insert_destination_id output_destination_index;
 
     /**
      * @brief Represents an invalid node index.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/query_optimizer/tests/execution_generator/Partition.test
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/execution_generator/Partition.test b/query_optimizer/tests/execution_generator/Partition.test
index ab05391..3bee78b 100644
--- a/query_optimizer/tests/execution_generator/Partition.test
+++ b/query_optimizer/tests/execution_generator/Partition.test
@@ -15,19 +15,35 @@
 # specific language governing permissions and limitations
 # under the License.
 
-CREATE TABLE foo (id INT NULL,
-                  name CHAR(20))
+CREATE TABLE dim (id INT NULL,
+                  char_col CHAR(20))
+PARTITION BY HASH(id) PARTITIONS 4;
+CREATE TABLE dim2 (id INT NULL,
+                   char_col CHAR(20))
+PARTITION BY HASH(id) PARTITIONS 2;
+CREATE TABLE fact (id INT NULL,
+                  score DOUBLE NULL)
 PARTITION BY HASH(id) PARTITIONS 4;
 
-INSERT INTO foo
+INSERT INTO dim
 SELECT int_col, char_col
 FROM test
 WHERE int_col > 0 OR int_col < 0;
 
-SELECT * FROM foo;
+INSERT INTO dim2
+SELECT int_col, char_col
+FROM test
+WHERE int_col > 0 OR int_col < 0;
+
+INSERT INTO fact
+SELECT int_col, double_col
+FROM test
+WHERE int_col % 2 = 0;
+
+SELECT * FROM dim;
 --
 +-----------+--------------------+
-|id         |name                |
+|id         |char_col            |
 +-----------+--------------------+
 |          4|          4 2.000000|
 |          8|          8 2.828427|
@@ -52,3 +68,48 @@ SELECT * FROM foo;
 |        -17|        -17 4.123106|
 |        -21|        -21 4.582576|
 +-----------+--------------------+
+==
+
+# Partitioned Hash Join.
+SELECT fact.id, dim.char_col
+FROM dim JOIN fact ON dim.id = fact.id;
+--
++-----------+--------------------+
+|id         |char_col            |
++-----------+--------------------+
+|          4|          4 2.000000|
+|          8|          8 2.828427|
+|         12|         12 3.464102|
+|         16|         16 4.000000|
+|         24|         24 4.898979|
+|          2|          2 1.414214|
+|          6|          6 2.449490|
+|         14|         14 3.741657|
+|         18|         18 4.242641|
+|         22|         22 4.690416|
++-----------+--------------------+
+==
+
+# Hash Join with two stored relations, one of which is partitioned.
+SELECT fact.id, test.char_col
+FROM test JOIN fact ON test.int_col = fact.id;
+--
+[same as above]
+==
+
+# Hash Join with one stored, partitioned relation,
+# and a non-stored, non-partitioned one.
+SELECT fact.id, test.char_col
+FROM fact JOIN test ON fact.id = test.int_col
+WHERE test.int_col % 2 = 0;
+--
+[same as above]
+==
+
+# Repartitioned Hash Join.
+SELECT fact.id, dim2.char_col
+FROM dim2, fact
+WHERE dim2.id = fact.id
+  AND dim2.id % 2 = 0;
+--
+[same as above]

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ce6a20b/types/TypedValue.hpp
----------------------------------------------------------------------
diff --git a/types/TypedValue.hpp b/types/TypedValue.hpp
index 0ba3d53..1b564c5 100644
--- a/types/TypedValue.hpp
+++ b/types/TypedValue.hpp
@@ -253,6 +253,25 @@ class TypedValue {
   }
 
   /**
+   * @brief Equal operator.
+   **/
+  bool operator==(const TypedValue &rhs) const {
+    if (getTypeID() != rhs.getTypeID()) {
+      return false;
+    }
+
+    if (isNull() != rhs.isNull()) {
+      return false;
+    }
+
+    if (isNull()) {
+      return true;
+    }
+
+    return fastEqualCheck(rhs);
+  }
+
+  /**
    * @brief Create a new literal TypedValue with pre-allocated out-of-line
    *        data.
    * @warning The memory at value_ptr must be allocated with malloc() or