You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/06/13 20:21:58 UTC

incubator-quickstep git commit: Refactored the operator support for broadcast hash join.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 8a0e24787 -> 2a622460b


Refactored the operator support for broadcast hash join.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/2a622460
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/2a622460
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/2a622460

Branch: refs/heads/master
Commit: 2a622460ba3c48ec9dd2b12b79e753e6214e3b7b
Parents: 8a0e247
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Jun 4 23:38:25 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Jun 12 17:06:03 2017 -0500

----------------------------------------------------------------------
 relational_operators/BuildHashOperator.hpp | 21 +++++++++++++----
 relational_operators/HashJoinOperator.cpp  | 16 ++++++-------
 relational_operators/HashJoinOperator.hpp  | 31 ++++++++-----------------
 3 files changed, 34 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a622460/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 634e1dd..50dbc05 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -76,7 +76,7 @@ class BuildHashOperator : public RelationalOperator {
    * @param join_key_attributes The IDs of equijoin attributes in
    *        input_relation.
    * @param any_join_key_attributes_nullable If any attribute is nullable.
-   * @param num_partitions The number of partitions in 'input_relation'. If no
+   * @param num_partitions The number of partitions in 'probe_relation'. If no
    *        partitions, it is one.
    * @param hash_table_index The index of the JoinHashTable in QueryContext.
    *        The HashTable's key Type(s) should be the Type(s) of the
@@ -95,6 +95,7 @@ class BuildHashOperator : public RelationalOperator {
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         num_partitions_(num_partitions),
+        is_broadcast_join_(num_partitions > 1u && !input_relation.hasPartitionScheme()),
         hash_table_index_(hash_table_index),
         input_relation_block_ids_(num_partitions),
         num_workorders_generated_(num_partitions),
@@ -102,12 +103,15 @@ class BuildHashOperator : public RelationalOperator {
     if (input_relation_is_stored) {
       if (input_relation.hasPartitionScheme()) {
         const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
-        for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+        DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
+        for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
           input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
         }
       } else {
-        // No partition.
-        input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
+        // Broadcast hash join if build has no partitions.
+        for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+          input_relation_block_ids_[part_id] = input_relation.getBlocksSnapshot();
+        }
       }
     }
   }
@@ -136,7 +140,13 @@ class BuildHashOperator : public RelationalOperator {
 
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                       const partition_id part_id) override {
-    input_relation_block_ids_[part_id].push_back(input_block_id);
+    if (is_broadcast_join_) {
+      for (partition_id probe_part_id = 0; probe_part_id < num_partitions_; ++probe_part_id) {
+        input_relation_block_ids_[probe_part_id].push_back(input_block_id);
+      }
+    } else {
+      input_relation_block_ids_[part_id].push_back(input_block_id);
+    }
   }
 
  private:
@@ -153,6 +163,7 @@ class BuildHashOperator : public RelationalOperator {
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
   const std::size_t num_partitions_;
+  const bool is_broadcast_join_;
   const QueryContext::join_hash_table_id hash_table_index_;
 
   // The index is the partition id.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a622460/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index f8acd74..77dc879 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -218,7 +218,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
         return true;
       }
 
-      for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
         const JoinHashTable &hash_table =
             *(query_context->getJoinHashTable(hash_table_index_, part_id));
 
@@ -234,7 +234,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
       started_ = true;
       return true;
     } else {
-      for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
         const JoinHashTable &hash_table =
             *(query_context->getJoinHashTable(hash_table_index_, part_id));
 
@@ -274,7 +274,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
         return true;
       }
 
-      for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
         const JoinHashTable &hash_table =
             *(query_context->getJoinHashTable(hash_table_index_, part_id));
 
@@ -290,7 +290,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
       started_ = true;
       return true;
     } else {
-      for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+      for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
         const JoinHashTable &hash_table =
             *(query_context->getJoinHashTable(hash_table_index_, part_id));
 
@@ -340,7 +340,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos(
       return true;
     }
 
-    for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
       for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
         container->addWorkOrderProto(
             createNonOuterJoinWorkOrderProto(hash_join_type, probe_block_id, part_id),
@@ -350,7 +350,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos(
     started_ = true;
     return true;
   } else {
-    for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
       while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
         container->addWorkOrderProto(
             createNonOuterJoinWorkOrderProto(hash_join_type,
@@ -402,7 +402,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *
       return true;
     }
 
-    for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
       for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
         container->addWorkOrderProto(createOuterJoinWorkOrderProto(probe_block_id, part_id), op_index_);
       }
@@ -410,7 +410,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *
     started_ = true;
     return true;
   } else {
-    for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+    for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
       while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
         container->addWorkOrderProto(
             createOuterJoinWorkOrderProto(probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/2a622460/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 8e9f2d7..6391847 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -102,7 +102,7 @@ class HashJoinOperator : public RelationalOperator {
    * @param join_key_attributes The IDs of equijoin attributes in
    *        probe_relation.
    * @param any_join_key_attributes_nullable If any attribute is nullable.
-   * @param build_num_partitions The number of partitions in 'build_relation'.
+   * @param num_partitions The number of partitions in 'probe_relation'.
    *        If no partitions, it is one.
    * @param output_relation The output relation.
    * @param output_destination_index The index of the InsertDestination in the
@@ -129,7 +129,7 @@ class HashJoinOperator : public RelationalOperator {
       const bool probe_relation_is_stored,
       const std::vector<attribute_id> &join_key_attributes,
       const bool any_join_key_attributes_nullable,
-      const std::size_t build_num_partitions,
+      const std::size_t num_partitions,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index,
       const QueryContext::join_hash_table_id hash_table_index,
@@ -143,7 +143,7 @@ class HashJoinOperator : public RelationalOperator {
         probe_relation_is_stored_(probe_relation_is_stored),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        build_num_partitions_(build_num_partitions),
+        num_partitions_(num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         hash_table_index_(hash_table_index),
@@ -153,8 +153,8 @@ class HashJoinOperator : public RelationalOperator {
                                    ? std::vector<bool>()
                                    : *is_selection_on_build),
         join_type_(join_type),
-        probe_relation_block_ids_(build_num_partitions),
-        num_workorders_generated_(build_num_partitions),
+        probe_relation_block_ids_(num_partitions),
+        num_workorders_generated_(num_partitions),
         started_(false) {
     DCHECK(join_type != JoinType::kLeftOuterJoin ||
                (is_selection_on_build != nullptr &&
@@ -163,15 +163,12 @@ class HashJoinOperator : public RelationalOperator {
     if (probe_relation_is_stored) {
       if (probe_relation.hasPartitionScheme()) {
         const PartitionScheme &part_scheme = *probe_relation.getPartitionScheme();
-        DCHECK_EQ(build_num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions());
-        for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+        for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
           probe_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
         }
       } else {
-        // Broadcast hash join if probe has no partitions.
-        for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
-          probe_relation_block_ids_[part_id] = probe_relation.getBlocksSnapshot();
-        }
+        // No partitions.
+        probe_relation_block_ids_[0] = probe_relation.getBlocksSnapshot();
       }
     }
   }
@@ -227,15 +224,7 @@ class HashJoinOperator : public RelationalOperator {
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                       const partition_id part_id) override {
     DCHECK_EQ(probe_relation_.getID(), input_relation_id);
-
-    if (probe_relation_.hasPartitionScheme()) {
-      probe_relation_block_ids_[part_id].push_back(input_block_id);
-    } else {
-      // Broadcast hash join if probe has no partitions.
-      for (std::size_t build_part_id = 0; build_part_id < build_num_partitions_; ++build_part_id) {
-        probe_relation_block_ids_[build_part_id].push_back(input_block_id);
-      }
-    }
+    probe_relation_block_ids_[part_id].push_back(input_block_id);
   }
 
   QueryContext::insert_destination_id getInsertDestinationID() const override {
@@ -287,7 +276,7 @@ class HashJoinOperator : public RelationalOperator {
   const bool probe_relation_is_stored_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const std::size_t build_num_partitions_;
+  const std::size_t num_partitions_;
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::join_hash_table_id hash_table_index_;