You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/10/23 22:24:59 UTC

incubator-quickstep git commit: Fuse HashJoin and Select.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/lip-fuse-select-join-fast-getblock [created] 13eed6e32


Fuse HashJoin and Select.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/13eed6e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/13eed6e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/13eed6e3

Branch: refs/heads/lip-fuse-select-join-fast-getblock
Commit: 13eed6e322560cf196fccff53bb436c7200ee818
Parents: 0af0274
Author: Hakan Memisoglu <ha...@gmail.com>
Authored: Tue Aug 16 16:40:27 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Sun Oct 23 17:24:34 2016 -0500

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   1 +
 query_optimizer/ExecutionGenerator.cpp          |  13 +-
 query_optimizer/PhysicalGenerator.cpp           |   2 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    |  23 ++-
 query_optimizer/physical/HashJoin.cpp           |   5 +
 query_optimizer/physical/HashJoin.hpp           |  18 +-
 query_optimizer/physical/Selection.cpp          |  14 ++
 query_optimizer/physical/Selection.hpp          |   2 +
 query_optimizer/rules/CMakeLists.txt            |  11 ++
 query_optimizer/rules/FuseJoinSelect.cpp        |  56 +++++++
 query_optimizer/rules/FuseJoinSelect.hpp        |  53 ++++++
 relational_operators/HashJoinOperator.cpp       | 168 +++++++++++++------
 relational_operators/HashJoinOperator.hpp       |  15 ++
 13 files changed, 321 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 00d5163..653690c 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -208,6 +208,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_FuseJoinSelect
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 2e0d8f3..2985045 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -665,10 +665,20 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   if (physical_plan->residual_predicate()) {
     residual_predicate_index = query_context_proto_->predicates_size();
 
-    unique_ptr<const Predicate> residual_predicate(convertPredicate(physical_plan->residual_predicate()));
+    std::unique_ptr<const Predicate> residual_predicate(
+        convertPredicate(physical_plan->residual_predicate()));
     query_context_proto_->add_predicates()->CopyFrom(residual_predicate->getProto());
   }
 
+  // Convert the probe side fused predicate proto.
+  QueryContext::predicate_id probe_side_predicate_index = QueryContext::kInvalidPredicateId;
+  if (physical_plan->left_filter_predicate()) {
+    probe_side_predicate_index = query_context_proto_->predicates_size();
+    std::unique_ptr<const Predicate> probe_side_predicate(
+        convertPredicate(physical_plan->left_filter_predicate()));
+    query_context_proto_->add_predicates()->CopyFrom(probe_side_predicate->getProto());
+  }
+
   // Convert the project expressions proto.
   const QueryContext::scalar_group_id project_expressions_group_index =
       query_context_proto_->scalar_groups_size();
@@ -773,6 +783,7 @@ void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
               insert_destination_index,
               join_hash_table_index,
               residual_predicate_index,
+              probe_side_predicate_index,
               project_expressions_group_index,
               is_selection_on_build.get(),
               join_type));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index 7cb97dc..677ef66 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/FuseJoinSelect.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 #include "query_optimizer/rules/SwapProbeBuild.hpp"
@@ -109,6 +110,7 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
   } else {
     rules.emplace_back(new SwapProbeBuild());
   }
+  rules.emplace_back(new FuseJoinSelect());
   if (FLAGS_use_lip_filters) {
     rules.emplace_back(new AttachLIPFilters());
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 1075739..33dffec 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -137,8 +137,14 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
   std::size_t right_cardinality = estimateCardinality(physical_plan->right());
   double left_selectivity = estimateSelectivity(physical_plan->left());
   double right_selectivity = estimateSelectivity(physical_plan->right());
-  return std::max(static_cast<std::size_t>(left_cardinality * right_selectivity + 0.5),
-                  static_cast<std::size_t>(right_cardinality * left_selectivity + 0.5));
+  double joint_filter_selectivity =
+      estimateSelectivityForPredicate(physical_plan->residual_predicate(), physical_plan) *
+          estimateSelectivityForPredicate(physical_plan->left_filter_predicate(), physical_plan);
+  return std::max(
+      static_cast<std::size_t>(
+          left_cardinality * right_selectivity * joint_filter_selectivity + 0.5),
+      static_cast<std::size_t>(
+          right_cardinality * left_selectivity * joint_filter_selectivity + 0.5));
 }
 
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForNestedLoopsJoin(
@@ -216,13 +222,16 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
+      double left_fused_filter_selectivity =
+          estimateSelectivityForPredicate(hash_join->left_filter_predicate(), hash_join);
       if (E::ContainsExprId(hash_join->left()->getOutputAttributes(), attribute_id)) {
         std::size_t left_child_num_distinct_values =
             estimateNumDistinctValues(attribute_id, hash_join->left());
         double right_child_selectivity =
             estimateSelectivity(hash_join->right());
         return static_cast<std::size_t>(
-            left_child_num_distinct_values * right_child_selectivity * filter_selectivity + 0.5);
+            left_child_num_distinct_values * left_fused_filter_selectivity *
+                right_child_selectivity * filter_selectivity + 0.5);
       }
       if (E::ContainsExprId(hash_join->right()->getOutputAttributes(), attribute_id)) {
         std::size_t right_child_num_distinct_values =
@@ -230,8 +239,10 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
         double left_child_selectivity =
             estimateSelectivity(hash_join->left());
         return static_cast<std::size_t>(
-            right_child_num_distinct_values * left_child_selectivity * filter_selectivity + 0.5);
+            right_child_num_distinct_values * left_child_selectivity *
+                left_fused_filter_selectivity * filter_selectivity + 0.5);
       }
+      break;
     }
     default:
       break;
@@ -258,7 +269,9 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
           estimateSelectivityForPredicate(hash_join->residual_predicate(), hash_join);
       double child_selectivity =
           estimateSelectivity(hash_join->left()) * estimateSelectivity(hash_join->right());
-      return filter_selectivity * child_selectivity;
+      double left_fused_filter_selectivity =
+          estimateSelectivityForPredicate(hash_join->left_filter_predicate(), hash_join);
+      return filter_selectivity * child_selectivity * left_fused_filter_selectivity;
     }
     case P::PhysicalType::kNestedLoopsJoin: {
       const P::NestedLoopsJoinPtr &nested_loop_join =

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/physical/HashJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index e186072..fb9fd2c 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -102,6 +102,11 @@ void HashJoin::getFieldStringItems(
     non_container_child_field_names->push_back("residual_predicate");
     non_container_child_fields->push_back(residual_predicate_);
   }
+  if (left_filter_predicate_ != nullptr) {
+    non_container_child_field_names->push_back("left_filter_predicate");
+    non_container_child_fields->push_back(left_filter_predicate_);
+  }
+
   container_child_field_names->push_back("left_join_attributes");
   container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(left_join_attributes_));
   container_child_field_names->push_back("right_join_attributes");

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/physical/HashJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index c513f77..2133a8a 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -107,6 +107,10 @@ class HashJoin : public BinaryJoin {
     return join_type_;
   }
 
+  const expressions::PredicatePtr& left_filter_predicate() const {
+    return left_filter_predicate_;
+  }
+
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK_EQ(children().size(), new_children.size());
@@ -136,6 +140,7 @@ class HashJoin : public BinaryJoin {
    * @param residual_predicate Optional filtering predicate evaluated after join.
    * @param project_expressions The project expressions.
    * @param Join type of this hash join.
+   * @param left_filter_predicate Optional filtering predicate for probe side before join.
    * @return An immutable physical HashJoin.
    */
   static HashJoinPtr Create(
@@ -145,7 +150,8 @@ class HashJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const JoinType join_type) {
+      const JoinType join_type,
+      const expressions::PredicatePtr &left_filter_predicate = nullptr) {
     return HashJoinPtr(
         new HashJoin(left,
                      right,
@@ -153,7 +159,8 @@ class HashJoin : public BinaryJoin {
                      right_join_attributes,
                      residual_predicate,
                      project_expressions,
-                     join_type));
+                     join_type,
+                     left_filter_predicate));
   }
 
  protected:
@@ -173,18 +180,21 @@ class HashJoin : public BinaryJoin {
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
       const std::vector<expressions::NamedExpressionPtr> &project_expressions,
-      const JoinType join_type)
+      const JoinType join_type,
+      const expressions::PredicatePtr &left_filter_predicate = nullptr)
       : BinaryJoin(left, right, project_expressions),
         left_join_attributes_(left_join_attributes),
         right_join_attributes_(right_join_attributes),
         residual_predicate_(residual_predicate),
-        join_type_(join_type) {
+        join_type_(join_type),
+        left_filter_predicate_(left_filter_predicate) {
   }
 
   std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
   std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
   expressions::PredicatePtr residual_predicate_;
   JoinType join_type_;
+  expressions::PredicatePtr left_filter_predicate_;
 
   DISALLOW_COPY_AND_ASSIGN(HashJoin);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/physical/Selection.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.cpp b/query_optimizer/physical/Selection.cpp
index 36ade04..8260ab1 100644
--- a/query_optimizer/physical/Selection.cpp
+++ b/query_optimizer/physical/Selection.cpp
@@ -20,6 +20,7 @@
 #include "query_optimizer/physical/Selection.hpp"
 
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
@@ -36,6 +37,19 @@ namespace physical {
 
 namespace E = ::quickstep::optimizer::expressions;
 
+bool Selection::isSimpleSelection() const {
+  std::unordered_set<E::ExprId> input_attr_ids;
+  for (const auto &attr : input()->getOutputAttributes()) {
+    input_attr_ids.emplace(attr->id());
+  }
+  for (const auto &attr : getOutputAttributes()) {
+    if (input_attr_ids.find(attr->id()) == input_attr_ids.end()) {
+      return false;
+    }
+  }
+  return true;
+}
+
 PhysicalPtr Selection::copyWithNewChildren(
     const std::vector<PhysicalPtr> &new_children) const {
   DCHECK_EQ(children().size(), new_children.size());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/physical/Selection.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/Selection.hpp b/query_optimizer/physical/Selection.hpp
index b6874a1..dd4aaf3 100644
--- a/query_optimizer/physical/Selection.hpp
+++ b/query_optimizer/physical/Selection.hpp
@@ -75,6 +75,8 @@ class Selection : public Physical {
    */
   inline const PhysicalPtr& input() const { return children()[0]; }
 
+  bool isSimpleSelection() const;
+
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 29875f6..54ce405 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,7 @@ add_subdirectory(tests)
 add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_FuseJoinSelect FuseJoinSelect.cpp FuseJoinSelect.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
@@ -66,6 +67,15 @@ target_link_libraries(quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_FuseJoinSelect
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TableReference
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_queryoptimizer_rules_Rule)
 target_link_libraries(quickstep_queryoptimizer_rules_GenerateJoins
                       glog
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -208,6 +218,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_FuseJoinSelect
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownFilter

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/rules/FuseJoinSelect.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.cpp b/query_optimizer/rules/FuseJoinSelect.cpp
new file mode 100644
index 0000000..d785b02
--- /dev/null
+++ b/query_optimizer/rules/FuseJoinSelect.cpp
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/FuseJoinSelect.hpp"
+
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr FuseJoinSelect::applyToNode(const P::PhysicalPtr &input) {
+  P::HashJoinPtr hash_join;
+  P::SelectionPtr selection;
+
+  if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join) &&
+      P::SomeSelection::MatchesWithConditionalCast(hash_join->left(), &selection) &&
+      selection->isSimpleSelection()) {
+    P::PhysicalPtr output = P::HashJoin::Create(selection->input(),
+                                                hash_join->right(),
+                                                hash_join->left_join_attributes(),
+                                                hash_join->right_join_attributes(),
+                                                hash_join->residual_predicate(),
+                                                hash_join->project_expressions(),
+                                                hash_join->join_type(),
+                                                selection->filter_predicate());
+    LOG_APPLYING_RULE(input, output);
+    return output;
+  }
+
+  LOG_IGNORING_RULE(input);
+  return input;
+}
+
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/query_optimizer/rules/FuseJoinSelect.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/FuseJoinSelect.hpp b/query_optimizer/rules/FuseJoinSelect.hpp
new file mode 100644
index 0000000..7b24809
--- /dev/null
+++ b/query_optimizer/rules/FuseJoinSelect.hpp
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_
+
+#include <string>
+
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+class FuseJoinSelect : public BottomUpRule<physical::Physical> {
+ public:
+  FuseJoinSelect() {
+  }
+
+  std::string getName() const override { return "FuseJoinSelect"; }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(FuseJoinSelect);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_FUSE_JOIN_SELECT_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 4a91f86..55f1720 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -172,6 +172,8 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
   if (blocking_dependencies_met_) {
     DCHECK(query_context != nullptr);
 
+    const Predicate *probe_side_predicate =
+        query_context->getPredicate(probe_side_predicate_index_);
     const Predicate *residual_predicate =
         query_context->getPredicate(residual_predicate_index_);
     const vector<unique_ptr<const Scalar>> &selection =
@@ -192,6 +194,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                                      any_join_key_attributes_nullable_,
                                      probe_block_id,
                                      residual_predicate,
+                                     probe_side_predicate,
                                      selection,
                                      hash_table,
                                      output_destination,
@@ -213,6 +216,7 @@ bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
                 any_join_key_attributes_nullable_,
                 probe_relation_block_ids_[num_workorders_generated_],
                 residual_predicate,
+                probe_side_predicate,
                 selection,
                 hash_table,
                 output_destination,
@@ -235,6 +239,8 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
   if (blocking_dependencies_met_) {
     DCHECK(query_context != nullptr);
 
+    const Predicate *probe_side_predicate =
+        query_context->getPredicate(probe_side_predicate_index_);
     const vector<unique_ptr<const Scalar>> &selection =
         query_context->getScalarGroup(selection_index_);
 
@@ -254,6 +260,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
                   join_key_attributes_,
                   any_join_key_attributes_nullable_,
                   probe_block_id,
+                  probe_side_predicate,
                   selection,
                   is_selection_on_build_,
                   hash_table,
@@ -275,6 +282,7 @@ bool HashJoinOperator::getAllOuterJoinWorkOrders(
                 join_key_attributes_,
                 any_join_key_attributes_nullable_,
                 probe_relation_block_ids_[num_workorders_generated_],
+                probe_side_predicate,
                 selection,
                 is_selection_on_build_,
                 hash_table,
@@ -419,29 +427,39 @@ void HashInnerJoinWorkOrder::execute() {
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<ValueAccessor> base_accessor(probe_store.createValueAccessor());
+  ValueAccessor *probe_accessor = base_accessor.get();
+
   std::unique_ptr<TupleIdSequence> existence_map;
-  std::unique_ptr<ValueAccessor> base_accessor;
+  std::unique_ptr<ValueAccessor> adapter_accessor;
+  // Apply probe side fused predicate first.
+  if (probe_side_predicate_ != nullptr) {
+    existence_map.reset(
+        probe_block->getMatchesForPredicate(probe_side_predicate_));
+    adapter_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
+  }
+  // Then apply the LIPFilters.
   if (lip_filter_adaptive_prober_ != nullptr) {
-    base_accessor.reset(probe_accessor.release());
     existence_map.reset(
-        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
-    probe_accessor.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(probe_accessor));
+    adapter_accessor.reset(
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
   }
 
   MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
     hash_table_.getAllFromValueAccessorCompositeKey(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_,
         any_join_key_attributes_nullable_,
         &collector);
@@ -508,7 +526,7 @@ void HashInnerJoinWorkOrder::execute() {
       temp_result.addColumn((*selection_cit)->getAllValuesForJoin(build_relation_id,
                                                                   build_accessor.get(),
                                                                   probe_relation_id,
-                                                                  probe_accessor.get(),
+                                                                  probe_accessor,
                                                                   build_block_entry.second));
     }
 
@@ -535,17 +553,27 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<ValueAccessor> base_accessor(probe_store.createValueAccessor());
+  ValueAccessor *probe_accessor = base_accessor.get();
+
   std::unique_ptr<TupleIdSequence> existence_map;
-  std::unique_ptr<ValueAccessor> base_accessor;
+  std::unique_ptr<ValueAccessor> adapter_accessor;
+  // Apply probe side fused predicate first.
+  if (probe_side_predicate_ != nullptr) {
+    existence_map.reset(
+        probe_block->getMatchesForPredicate(probe_side_predicate_));
+    adapter_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
+  }
+  // Then apply the LIPFilters.
   if (lip_filter_adaptive_prober_ != nullptr) {
-    base_accessor.reset(probe_accessor.release());
     existence_map.reset(
-        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
-    probe_accessor.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(probe_accessor));
+    adapter_accessor.reset(
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
   }
 
   // We collect all the matching probe relation tuples, as there's a residual
@@ -553,13 +581,13 @@ void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
   MapBasedJoinedTupleCollector collector;
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
     hash_table_.getAllFromValueAccessorCompositeKey(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_,
         any_join_key_attributes_nullable_,
         &collector);
@@ -625,17 +653,27 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<ValueAccessor> base_accessor(probe_store.createValueAccessor());
+  ValueAccessor *probe_accessor = base_accessor.get();
+
   std::unique_ptr<TupleIdSequence> existence_map;
-  std::unique_ptr<ValueAccessor> base_accessor;
+  std::unique_ptr<ValueAccessor> adapter_accessor;
+  // Apply probe side fused predicate first.
+  if (probe_side_predicate_ != nullptr) {
+    existence_map.reset(
+        probe_block->getMatchesForPredicate(probe_side_predicate_));
+    adapter_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
+  }
+  // Then apply the LIPFilters.
   if (lip_filter_adaptive_prober_ != nullptr) {
-    base_accessor.reset(probe_accessor.release());
     existence_map.reset(
-        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
-    probe_accessor.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(probe_accessor));
+    adapter_accessor.reset(
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
   }
 
   if (existence_map == nullptr) {
@@ -652,14 +690,14 @@ void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
   if (join_key_attributes_.size() == 1u) {
     // Call the collector to set the bit to 0 for every key without a match.
     hash_table_.runOverKeysFromValueAccessorIfMatchNotFound(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
     // Call the collector to set the bit to 0 for every key without a match.
     hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_,
         any_join_key_attributes_nullable_,
         &collector);
@@ -687,17 +725,27 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<ValueAccessor> base_accessor(probe_store.createValueAccessor());
+  ValueAccessor *probe_accessor = base_accessor.get();
+
   std::unique_ptr<TupleIdSequence> existence_map;
-  std::unique_ptr<ValueAccessor> base_accessor;
+  std::unique_ptr<ValueAccessor> adapter_accessor;
+  // Apply probe side fused predicate first.
+  if (probe_side_predicate_ != nullptr) {
+    existence_map.reset(
+        probe_block->getMatchesForPredicate(probe_side_predicate_));
+    adapter_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
+  }
+  // Then apply the LIPFilters.
   if (lip_filter_adaptive_prober_ != nullptr) {
-    base_accessor.reset(probe_accessor.release());
     existence_map.reset(
-        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
-    probe_accessor.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(probe_accessor));
+    adapter_accessor.reset(
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
   }
 
   if (existence_map == nullptr) {
@@ -710,14 +758,14 @@ void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
   if (join_key_attributes_.size() == 1) {
     // Call the collector to set the bit to 0 for every key with a match.
     hash_table_.runOverKeysFromValueAccessorIfMatchFound(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
     // Call the collector to set the bit to 0 for every key with a match.
     hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_,
         any_join_key_attributes_nullable_,
         &collector);
@@ -746,17 +794,27 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
   BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                           probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<ValueAccessor> base_accessor(probe_store.createValueAccessor());
+  ValueAccessor *probe_accessor = base_accessor.get();
+
   std::unique_ptr<TupleIdSequence> existence_map;
-  std::unique_ptr<ValueAccessor> base_accessor;
+  std::unique_ptr<ValueAccessor> adapter_accessor;
+  // Apply probe side fused predicate first.
+  if (probe_side_predicate_ != nullptr) {
+    existence_map.reset(
+        probe_block->getMatchesForPredicate(probe_side_predicate_));
+    adapter_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
+  }
+  // Then apply the LIPFilters.
   if (lip_filter_adaptive_prober_ != nullptr) {
-    base_accessor.reset(probe_accessor.release());
     existence_map.reset(
-        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
-    probe_accessor.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(probe_accessor));
+    adapter_accessor.reset(
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
   }
 
   MapBasedJoinedTupleCollector collector;
@@ -766,13 +824,13 @@ void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
   // predicate.
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessor(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
     hash_table_.getAllFromValueAccessorCompositeKey(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_,
         any_join_key_attributes_nullable_,
         &collector);
@@ -838,17 +896,27 @@ void HashOuterJoinWorkOrder::execute() {
   const BlockReference probe_block = storage_manager_->getBlock(block_id_,
                                                                 probe_relation_);
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
-  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
 
-  // Probe the LIPFilters to generate an existence bitmap for probe_accessor, if enabled.
+  std::unique_ptr<ValueAccessor> base_accessor(probe_store.createValueAccessor());
+  ValueAccessor *probe_accessor = base_accessor.get();
+
   std::unique_ptr<TupleIdSequence> existence_map;
-  std::unique_ptr<ValueAccessor> base_accessor;
+  std::unique_ptr<ValueAccessor> adapter_accessor;
+  // Apply probe side fused predicate first.
+  if (probe_side_predicate_ != nullptr) {
+    existence_map.reset(
+        probe_block->getMatchesForPredicate(probe_side_predicate_));
+    adapter_accessor.reset(
+        base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
+  }
+  // Then apply the LIPFilters.
   if (lip_filter_adaptive_prober_ != nullptr) {
-    base_accessor.reset(probe_accessor.release());
     existence_map.reset(
-        lip_filter_adaptive_prober_->filterValueAccessor(base_accessor.get()));
-    probe_accessor.reset(
+        lip_filter_adaptive_prober_->filterValueAccessor(probe_accessor));
+    adapter_accessor.reset(
         base_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+    probe_accessor = adapter_accessor.get();
   }
 
   if (existence_map == nullptr) {
@@ -858,13 +926,13 @@ void HashOuterJoinWorkOrder::execute() {
   OuterJoinTupleCollector collector(existence_map.get());
   if (join_key_attributes_.size() == 1) {
     hash_table_.getAllFromValueAccessorWithExtraWorkForFirstMatch(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
     hash_table_.getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
-        probe_accessor.get(),
+        probe_accessor,
         join_key_attributes_,
         any_join_key_attributes_nullable_,
         &collector);
@@ -889,7 +957,7 @@ void HashOuterJoinWorkOrder::execute() {
               build_relation_id,
               build_accessor.get(),
               probe_relation_id,
-              probe_accessor.get(),
+              probe_accessor,
               build_block_entry.second));
     }
     output_destination_->bulkInsertTuples(&temp_result);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/13eed6e3/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 0ed1eeb..2806b0b 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -129,6 +129,7 @@ class HashJoinOperator : public RelationalOperator {
       const QueryContext::insert_destination_id output_destination_index,
       const QueryContext::join_hash_table_id hash_table_index,
       const QueryContext::predicate_id residual_predicate_index,
+      const QueryContext::predicate_id probe_side_predicate_index,
       const QueryContext::scalar_group_id selection_index,
       const std::vector<bool> *is_selection_on_build = nullptr,
       const JoinType join_type = JoinType::kInnerJoin)
@@ -142,6 +143,7 @@ class HashJoinOperator : public RelationalOperator {
         output_destination_index_(output_destination_index),
         hash_table_index_(hash_table_index),
         residual_predicate_index_(residual_predicate_index),
+        probe_side_predicate_index_(probe_side_predicate_index),
         selection_index_(selection_index),
         is_selection_on_build_(is_selection_on_build == nullptr
                                    ? std::vector<bool>()
@@ -257,6 +259,7 @@ class HashJoinOperator : public RelationalOperator {
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::join_hash_table_id hash_table_index_;
   const QueryContext::predicate_id residual_predicate_index_;
+  const QueryContext::predicate_id probe_side_predicate_index_;
   const QueryContext::scalar_group_id selection_index_;
   const std::vector<bool> is_selection_on_build_;
   const JoinType join_type_;
@@ -306,6 +309,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
       const Predicate *residual_predicate,
+      const Predicate *probe_side_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
@@ -318,6 +322,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
+        probe_side_predicate_(probe_side_predicate),
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -393,6 +398,7 @@ class HashInnerJoinWorkOrder : public WorkOrder {
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
+  const Predicate *probe_side_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const JoinHashTable &hash_table_;
 
@@ -442,6 +448,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
       const Predicate *residual_predicate,
+      const Predicate *probe_side_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
@@ -454,6 +461,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
+        probe_side_predicate_(probe_side_predicate),
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -525,6 +533,7 @@ class HashSemiJoinWorkOrder : public WorkOrder {
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
+  const Predicate *probe_side_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const JoinHashTable &hash_table_;
 
@@ -574,6 +583,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
       const Predicate *residual_predicate,
+      const Predicate *probe_side_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const JoinHashTable &hash_table,
       InsertDestination *output_destination,
@@ -586,6 +596,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
+        probe_side_predicate_(probe_side_predicate),
         selection_(selection),
         hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
@@ -663,6 +674,7 @@ class HashAntiJoinWorkOrder : public WorkOrder {
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
   const Predicate *residual_predicate_;
+  const Predicate *probe_side_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const JoinHashTable &hash_table_;
 
@@ -709,6 +721,7 @@ class HashOuterJoinWorkOrder : public WorkOrder {
       const std::vector<attribute_id> &join_key_attributes,
       const bool any_join_key_attributes_nullable,
       const block_id lookup_block_id,
+      const Predicate *probe_side_predicate,
       const std::vector<std::unique_ptr<const Scalar>> &selection,
       const std::vector<bool> &is_selection_on_build,
       const JoinHashTable &hash_table,
@@ -721,6 +734,7 @@ class HashOuterJoinWorkOrder : public WorkOrder {
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
         block_id_(lookup_block_id),
+        probe_side_predicate_(probe_side_predicate),
         selection_(selection),
         is_selection_on_build_(is_selection_on_build),
         hash_table_(hash_table),
@@ -786,6 +800,7 @@ class HashOuterJoinWorkOrder : public WorkOrder {
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
   const block_id block_id_;
+  const Predicate *probe_side_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
   const std::vector<bool> is_selection_on_build_;
   const JoinHashTable &hash_table_;