You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2017/01/31 23:36:21 UTC

[1/2] incubator-quickstep git commit: Add BitVectorExactFilter as a LIP filter and supports Join-to-Semijoin transformation.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 5ffdaaf9f -> 4ba819c5b


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/InjectJoinFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.cpp b/query_optimizer/rules/InjectJoinFilters.cpp
new file mode 100644
index 0000000..0fcd06b
--- /dev/null
+++ b/query_optimizer/rules/InjectJoinFilters.cpp
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/InjectJoinFilters.hpp"
+
+#include <cstddef>
+#include <cstdint>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "query_optimizer/rules/PruneColumns.hpp"
+#include "types/TypeID.hpp"
+#include "types/TypedValue.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr InjectJoinFilters::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  const P::TopLevelPlanPtr top_level_plan =
+     std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(
+          top_level_plan->shared_subplans()));
+  lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
+
+  // Step 1. Transform applicable HashJoin nodes to FilterJoin nodes.
+  P::PhysicalPtr output = transformHashJoinToFilters(input);
+
+  // Step 2. Push down FilterJoin nodes to be evaluated early.
+  output = pushDownFilters(output);
+
+  // Step 3. Add Selection nodes for attaching the LIPFilters, if necessary.
+  output = addFilterAnchors(output, false);
+
+  // Step 4. Because of the pushdown of FilterJoin nodes, there are optimization
+  // opportunities for projecting columns early.
+  output = PruneColumns().apply(output);
+
+  // Step 5. For each FilterJoin node, attach its corresponding LIPFilter to
+  // proper nodes.
+  concretizeAsLIPFilters(output, nullptr);
+
+  if (!lip_filter_configuration_->getBuildInfoMap().empty() ||
+      !lip_filter_configuration_->getProbeInfoMap().empty()) {
+    output = std::static_pointer_cast<const P::TopLevelPlan>(output)
+        ->copyWithLIPFilterConfiguration(
+              P::LIPFilterConfigurationPtr(lip_filter_configuration_.release()));
+  }
+
+  return output;
+}
+
+bool InjectJoinFilters::isTransformable(
+    const physical::HashJoinPtr &hash_join) const {
+  // Conditions for replacing a HashJoin with a FilterJoin:
+
+  // No residual predicate.
+  if (hash_join->residual_predicate() != nullptr) {
+    return false;
+  }
+  // Single attribute equi-join.
+  if (hash_join->right_join_attributes().size() > 1) {
+    return false;
+  }
+  // All the output attributes must be from the probe side.
+  if (!E::SubsetOfExpressions(hash_join->getOutputAttributes(),
+                              hash_join->left()->getOutputAttributes())) {
+    return false;
+  }
+  switch (hash_join->join_type()) {
+    case P::HashJoin::JoinType::kInnerJoin: {
+      // In the case of inner join, the build side join attributes must be unique.
+      if (!cost_model_->impliesUniqueAttributes(hash_join->right(),
+                                                hash_join->right_join_attributes())) {
+        return false;
+      }
+      break;
+    }
+    case P::HashJoin::JoinType::kLeftSemiJoin:  // Fall through
+    case P::HashJoin::JoinType::kLeftAntiJoin:
+      break;
+    default:
+      return false;
+  }
+
+  // The build side join attribute has integer type and its values are exactly
+  // within a reasonable range.
+  std::int64_t min_cpp_value;
+  std::int64_t max_cpp_value;
+  const bool has_exact_min_max_stats =
+      findExactMinMaxValuesForAttributeHelper(hash_join->right(),
+                                              hash_join->right_join_attributes().front(),
+                                              &min_cpp_value,
+                                              &max_cpp_value);
+  if (!has_exact_min_max_stats) {
+    return false;
+  }
+
+  // TODO(jianqiao): implement SimpleHashSetExactFilter to relax this requirement.
+  // Note that 1G bits = 128MB.
+  const std::int64_t value_range = max_cpp_value - min_cpp_value;
+  DCHECK_GE(value_range, 0);
+  if (value_range > kMaxFilterSize) {
+    return false;
+  }
+
+  return true;
+}
+
+P::PhysicalPtr InjectJoinFilters::transformHashJoinToFilters(
+    const P::PhysicalPtr &input) const {
+  std::vector<P::PhysicalPtr> new_children;
+  bool has_changed_children = false;
+  for (const P::PhysicalPtr &child : input->children()) {
+    const P::PhysicalPtr new_child = transformHashJoinToFilters(child);
+    if (child != new_child && !has_changed_children) {
+      has_changed_children = true;
+    }
+    new_children.push_back(new_child);
+  }
+
+  P::HashJoinPtr hash_join;
+  if (P::SomeHashJoin::MatchesWithConditionalCast(input, &hash_join) &&
+      isTransformable(hash_join)) {
+    const bool is_anti_join =
+        hash_join->join_type() == P::HashJoin::JoinType::kLeftAntiJoin;
+
+    DCHECK_EQ(2u, new_children.size());
+    P::PhysicalPtr build_child = new_children[1];
+    E::PredicatePtr build_side_filter_predicate = nullptr;
+    P::SelectionPtr selection;
+    if (P::SomeSelection::MatchesWithConditionalCast(build_child, &selection) &&
+        E::SubsetOfExpressions(hash_join->right_join_attributes(),
+                               selection->input()->getOutputAttributes())) {
+      build_child = selection->input();
+      build_side_filter_predicate = selection->filter_predicate();
+    }
+
+    return P::FilterJoin::Create(new_children[0],
+                                 build_child,
+                                 hash_join->left_join_attributes(),
+                                 hash_join->right_join_attributes(),
+                                 hash_join->project_expressions(),
+                                 build_side_filter_predicate,
+                                 is_anti_join);
+  }
+
+  if (has_changed_children) {
+    return input->copyWithNewChildren(new_children);
+  } else {
+    return input;
+  }
+}
+
+physical::PhysicalPtr InjectJoinFilters::pushDownFilters(
+    const physical::PhysicalPtr &input) const {
+  std::vector<P::PhysicalPtr> new_children;
+  bool has_changed_children = false;
+  for (const P::PhysicalPtr &child : input->children()) {
+    const P::PhysicalPtr new_child = pushDownFilters(child);
+    if (child != new_child && !has_changed_children) {
+      has_changed_children = true;
+    }
+    new_children.push_back(new_child);
+  }
+
+  P::FilterJoinPtr filter_join;
+  if (P::SomeFilterJoin::MatchesWithConditionalCast(input, &filter_join)) {
+    DCHECK_EQ(2u, new_children.size());
+    return pushDownFiltersInternal(
+        new_children[0], new_children[1], filter_join);
+  }
+
+  if (has_changed_children) {
+    return input->copyWithNewChildren(new_children);
+  } else {
+    return input;
+  }
+}
+
+physical::PhysicalPtr InjectJoinFilters::pushDownFiltersInternal(
+    const physical::PhysicalPtr &probe_child,
+    const physical::PhysicalPtr &build_child,
+    const physical::FilterJoinPtr &filter_join) const {
+  switch (probe_child->getPhysicalType()) {
+    case P::PhysicalType::kAggregate:  // Fall through
+    case P::PhysicalType::kHashJoin:
+    case P::PhysicalType::kSample:
+    case P::PhysicalType::kSelection:
+    case P::PhysicalType::kSort:
+    case P::PhysicalType::kWindowAggregate: {
+      DCHECK_GE(probe_child->getNumChildren(), 1u);
+      const P::PhysicalPtr child = probe_child->children()[0];
+      if (E::SubsetOfExpressions(filter_join->probe_attributes(),
+                                 child->getOutputAttributes())) {
+        const P::PhysicalPtr new_child =
+            pushDownFiltersInternal(child, build_child, filter_join);
+        if (new_child != child) {
+          std::vector<P::PhysicalPtr> new_children = probe_child->children();
+          new_children[0] = new_child;
+          return probe_child->copyWithNewChildren(new_children);
+        }
+      }
+    }
+    default:
+      break;
+  }
+
+  if (probe_child != filter_join->left()) {
+    // TODO(jianqiao): may need to update probe_attributes.
+    return P::FilterJoin::Create(probe_child,
+                                 build_child,
+                                 filter_join->probe_attributes(),
+                                 filter_join->build_attributes(),
+                                 E::ToNamedExpressions(probe_child->getOutputAttributes()),
+                                 filter_join->build_side_filter_predicate(),
+                                 filter_join->is_anti_join());
+  } else {
+    return filter_join;
+  }
+}
+
+
+physical::PhysicalPtr InjectJoinFilters::addFilterAnchors(
+    const physical::PhysicalPtr &input,
+    const bool ancestor_can_anchor_filter) const {
+  std::vector<P::PhysicalPtr> new_children;
+
+  switch (input->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(input);
+      new_children.emplace_back(
+          addFilterAnchors(aggregate->input(), true));
+      break;
+    }
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr &selection =
+          std::static_pointer_cast<const P::Selection>(input);
+      new_children.emplace_back(
+          addFilterAnchors(selection->input(), true));
+      break;
+    }
+    // NOTE(jianqiao): Some of the SSB/TPCH queries slow down significantly if
+    // we attach converted filters to parent HashJoin nodes. E.g. one HashJoin +
+    // one attached LIPFilter is slower than the original two HashJoins. This is
+    // due to some implementation issues with the current HashJoinOperator. So
+    // currently we disable the anchoring of filters to HashJoin nodes. That is,
+    // in the case that a FilterJoin's parent node (or ancestor node, if there
+    // is a chain of FilterJoins) is a HashJoin, we create an extra Selection
+    // before the parent HashJoin as anchoring node to attach the filters. This
+    // guarantees non-degrading performance.
+    /*
+    case P::PhysicalType::kHashJoin: {
+      const P::HashJoinPtr &hash_join =
+          std::static_pointer_cast<const P::HashJoin>(input);
+      new_children.emplace_back(
+          addFilterAnchors(hash_join->left(), true));
+      new_children.emplace_back(
+          addFilterAnchors(hash_join->right(), false));
+      break;
+    }
+    */
+    case P::PhysicalType::kFilterJoin: {
+      const P::FilterJoinPtr &filter_join =
+          std::static_pointer_cast<const P::FilterJoin>(input);
+      new_children.emplace_back(
+          addFilterAnchors(filter_join->left(), true));
+      new_children.emplace_back(
+          addFilterAnchors(filter_join->right(), true));
+      break;
+    }
+    default: {
+      for (const P::PhysicalPtr &child : input->children()) {
+        new_children.emplace_back(addFilterAnchors(child, false));
+      }
+    }
+  }
+
+  DCHECK_EQ(new_children.size(), input->children().size());
+  const P::PhysicalPtr output_with_new_children =
+      new_children == input->children()
+          ? input
+          : input->copyWithNewChildren(new_children);
+
+  if (input->getPhysicalType() == P::PhysicalType::kFilterJoin &&
+      !ancestor_can_anchor_filter) {
+    const P::FilterJoinPtr &filter_join =
+        std::static_pointer_cast<const P::FilterJoin>(output_with_new_children);
+    return P::Selection::Create(filter_join,
+                                filter_join->project_expressions(),
+                                nullptr);
+  } else {
+    return output_with_new_children;
+  }
+}
+
+void InjectJoinFilters::concretizeAsLIPFilters(
+    const P::PhysicalPtr &input,
+    const P::PhysicalPtr &anchor_node) const {
+  switch (input->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(input);
+      concretizeAsLIPFilters(aggregate->input(), aggregate);
+      break;
+    }
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr &selection =
+          std::static_pointer_cast<const P::Selection>(input);
+      concretizeAsLIPFilters(selection->input(), selection);
+      break;
+    }
+    // Currently we disable the attachment of filters to HashJoin nodes. See the
+    // comments in InjectJoinFilters::addFilterAnchors().
+    /*
+    case P::PhysicalType::kHashJoin: {
+      const P::HashJoinPtr &hash_join =
+          std::static_pointer_cast<const P::HashJoin>(input);
+      concretizeAsLIPFilters(hash_join->left(), hash_join);
+      concretizeAsLIPFilters(hash_join->right(), nullptr);
+      break;
+    }
+    */
+    case P::PhysicalType::kFilterJoin: {
+      const P::FilterJoinPtr &filter_join =
+          std::static_pointer_cast<const P::FilterJoin>(input);
+      DCHECK_EQ(1u, filter_join->build_attributes().size());
+      const E::AttributeReferencePtr &build_attr =
+          filter_join->build_attributes().front();
+
+      std::int64_t min_cpp_value;
+      std::int64_t max_cpp_value;
+      const bool has_exact_min_max_stats =
+          findExactMinMaxValuesForAttributeHelper(filter_join,
+                                                  build_attr,
+                                                  &min_cpp_value,
+                                                  &max_cpp_value);
+      DCHECK(has_exact_min_max_stats);
+      DCHECK_GE(max_cpp_value, min_cpp_value);
+      DCHECK_LE(max_cpp_value - min_cpp_value, kMaxFilterSize);
+      CHECK(anchor_node != nullptr);
+
+      lip_filter_configuration_->addBuildInfo(
+          P::BitVectorExactFilterBuildInfo::Create(build_attr,
+                                                   min_cpp_value,
+                                                   max_cpp_value,
+                                                   filter_join->is_anti_join()),
+          filter_join);
+      lip_filter_configuration_->addProbeInfo(
+          P::LIPFilterProbeInfo::Create(filter_join->probe_attributes().front(),
+                                        build_attr,
+                                        filter_join),
+          anchor_node);
+
+      concretizeAsLIPFilters(filter_join->left(), anchor_node);
+      concretizeAsLIPFilters(filter_join->right(), filter_join);
+      break;
+    }
+    default: {
+      for (const P::PhysicalPtr &child : input->children()) {
+        concretizeAsLIPFilters(child, nullptr);
+      }
+    }
+  }
+}
+
+bool InjectJoinFilters::findExactMinMaxValuesForAttributeHelper(
+    const physical::PhysicalPtr &physical_plan,
+    const expressions::AttributeReferencePtr &attribute,
+    std::int64_t *min_cpp_value,
+    std::int64_t *max_cpp_value) const {
+  bool min_value_is_exact;
+  bool max_value_is_exact;
+
+  const TypedValue min_value =
+      cost_model_->findMinValueStat(physical_plan, attribute, &min_value_is_exact);
+  const TypedValue max_value =
+      cost_model_->findMaxValueStat(physical_plan, attribute, &max_value_is_exact);
+  if (min_value.isNull() || max_value.isNull() ||
+      (!min_value_is_exact) || (!max_value_is_exact)) {
+    return false;
+  }
+
+  switch (attribute->getValueType().getTypeID()) {
+    case TypeID::kInt: {
+      *min_cpp_value = min_value.getLiteral<int>();
+      *max_cpp_value = max_value.getLiteral<int>();
+      return true;
+    }
+    case TypeID::kLong: {
+      *min_cpp_value = min_value.getLiteral<std::int64_t>();
+      *max_cpp_value = max_value.getLiteral<std::int64_t>();
+      return true;
+    }
+    default:
+      return false;
+  }
+}
+
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/InjectJoinFilters.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/InjectJoinFilters.hpp b/query_optimizer/rules/InjectJoinFilters.hpp
new file mode 100644
index 0000000..c5250b3
--- /dev/null
+++ b/query_optimizer/rules/InjectJoinFilters.hpp
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to transform HashJoin nodes into
+ *        FilterJoin nodes.
+ * 
+ * This is an optimization that strength-reduces HashJoins to FilterJoins
+ * (implemented as LIPFilters attached to some anchoring operators where the
+ * filters get applied). Briefly speaking, the idea is that in the case that
+ * (1) the join attribute has consecutive integer values bounded in a reasonably
+ * small range AND (2) the output attributes are all from the probe-side table,
+ * we can eliminate the HashJoin by building a BitVector on the build-side
+ * attribute and using the BitVector to filter the probe-side table.
+ */
+class InjectJoinFilters : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  InjectJoinFilters() {}
+
+  ~InjectJoinFilters() override {}
+
+  std::string getName() const override {
+    return "TransformFilterJoins";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  // Check whether a HashJoin can be transformed into a FilterJoin.
+  bool isTransformable(const physical::HashJoinPtr &hash_join) const;
+
+  // Transform applicable HashJoin nodes into FilterJoin nodes.
+  physical::PhysicalPtr transformHashJoinToFilters(
+      const physical::PhysicalPtr &input) const;
+
+  // Push down FilterJoin nodes to be evaluated early.
+  physical::PhysicalPtr pushDownFilters(const physical::PhysicalPtr &input) const;
+
+  // Add Selection node, if necessary, for anchoring the LIP filters built by
+  // FilterJoin nodes.
+  physical::PhysicalPtr addFilterAnchors(const physical::PhysicalPtr &input,
+                                         const bool ancestor_can_anchor_filter) const;
+
+  // Setup lip_filter_configuration_ with the transformed plan tree.
+  void concretizeAsLIPFilters(const physical::PhysicalPtr &input,
+                              const physical::PhysicalPtr &anchor_node) const;
+
+  physical::PhysicalPtr pushDownFiltersInternal(
+      const physical::PhysicalPtr &probe_child,
+      const physical::PhysicalPtr &build_child,
+      const physical::FilterJoinPtr &filter_join) const;
+
+  bool findExactMinMaxValuesForAttributeHelper(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      std::int64_t *min_cpp_value,
+      std::int64_t *max_cpp_value) const;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+  std::unique_ptr<physical::LIPFilterConfiguration> lip_filter_configuration_;
+
+  // TODO(jianqiao): Add this threshold as a gflag.
+  // Note that 1G bits = 128MB
+  static constexpr std::int64_t kMaxFilterSize = 1000000000L;
+
+  DISALLOW_COPY_AND_ASSIGN(InjectJoinFilters);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_INJECT_JOIN_FILTERS_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/tests/OptimizerTextTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/OptimizerTextTest.cpp b/query_optimizer/tests/OptimizerTextTest.cpp
index e17f5c4..07accf6 100644
--- a/query_optimizer/tests/OptimizerTextTest.cpp
+++ b/query_optimizer/tests/OptimizerTextTest.cpp
@@ -34,6 +34,7 @@ namespace optimizer {
 DECLARE_bool(reorder_columns);
 DECLARE_bool(reorder_hash_joins);
 DECLARE_bool(use_lip_filters);
+DECLARE_bool(use_filter_joins);
 
 }
 }
@@ -64,6 +65,7 @@ int main(int argc, char** argv) {
   quickstep::optimizer::FLAGS_reorder_columns = false;
   quickstep::optimizer::FLAGS_reorder_hash_joins = false;
   quickstep::optimizer::FLAGS_use_lip_filters = false;
+  quickstep::optimizer::FLAGS_use_filter_joins = false;
 
   ::testing::InitGoogleTest(&argc, argv);
   int success = RUN_ALL_TESTS();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
new file mode 100644
index 0000000..f7c09cd
--- /dev/null
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "relational_operators/BuildLIPFilterOperator.hpp"
+
+#include <memory>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
+#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlock.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageManager.hpp"
+#include "storage/TupleIdSequence.hpp"
+#include "storage/TupleStorageSubBlock.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+#include "utility/lip_filter/LIPFilterUtil.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace quickstep {
+
+bool BuildLIPFilterOperator::getAllWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
+  DCHECK(query_context != nullptr);
+
+  const Predicate *build_side_predicate =
+      query_context->getPredicate(build_side_predicate_index_);
+
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addNormalWorkOrder(
+            new BuildLIPFilterWorkOrder(
+                query_id_,
+                input_relation_,
+                input_block_id,
+                build_side_predicate,
+                storage_manager,
+                CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+                CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+            op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new BuildLIPFilterWorkOrder(
+              query_id_,
+              input_relation_,
+              input_relation_block_ids_[num_workorders_generated_],
+              build_side_predicate,
+              storage_manager,
+              CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context),
+              CreateLIPFilterBuilderHelper(lip_deployment_index_, query_context)),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+bool BuildLIPFilterOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (input_relation_is_stored_) {
+    if (!started_) {
+      for (const block_id block : input_relation_block_ids_) {
+        container->addWorkOrderProto(createWorkOrderProto(block), op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addWorkOrderProto(
+          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const block_id block) {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::BUILD_LIP_FILTER);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::BuildLIPFilterWorkOrder::relation_id, input_relation_.getID());
+  proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id, block);
+  proto->SetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index,
+                      build_side_predicate_index_);
+  proto->SetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index, lip_deployment_index_);
+
+  return proto;
+}
+
+void BuildLIPFilterWorkOrder::execute() {
+  BlockReference block(
+      storage_manager_->getBlock(build_block_id_, input_relation_));
+
+  // Apply the predicate first.
+  std::unique_ptr<TupleIdSequence> predicate_matches;
+  if (build_side_predicate_ != nullptr) {
+    predicate_matches.reset(block->getMatchesForPredicate(build_side_predicate_));
+  }
+
+  std::unique_ptr<ValueAccessor> accessor(
+      block->getTupleStorageSubBlock().createValueAccessor(predicate_matches.get()));
+
+  if (lip_filter_adaptive_prober_ != nullptr) {
+    // Probe the LIP filters if there are any. Note that the LIP filters to be
+    // probed are for filtering the input relation. They are distinct from the
+    // target LIP filters we are building.
+    std::unique_ptr<TupleIdSequence> matches(
+        lip_filter_adaptive_prober_->filterValueAccessor(accessor.get()));
+    std::unique_ptr<ValueAccessor> filtered_accessor(
+        accessor->createSharedTupleIdSequenceAdapterVirtual(*matches));
+
+    lip_filter_builder_->insertValueAccessor(filtered_accessor.get());
+  } else {
+    lip_filter_builder_->insertValueAccessor(accessor.get());
+  }
+}
+
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/BuildLIPFilterOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
new file mode 100644
index 0000000..5192b40
--- /dev/null
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/RelationalOperator.hpp"
+#include "relational_operators/WorkOrder.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
+#include "utility/lip_filter/LIPFilterBuilder.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogRelationSchema;
+class Predicate;
+class StorageManager;
+class WorkOrderProtosContainer;
+class WorkOrdersContainer;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief An operator which builds LIPFilters on one relation.
+ **/
+class BuildLIPFilterOperator : public RelationalOperator {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @note The LIPFilters' information are not passed explicitly as parameters
+   *       to this constructor, but attached later via RelationalOperator::deployLIPFilters().
+   *
+   * @param query_id The ID of the query to which this operator belongs.
+   * @param input_relation The relation to build LIP filters on.
+   * @param build_side_predicate_index The index of the predicate in QueryContext
+   *        where the predicate is to be applied to the input relation before
+   *        building the LIP filters (or kInvalidPredicateId if no predicate is
+   *        to be applied).
+   * @param input_relation_is_stored If input_relation is a stored relation and
+   *        is fully available to the operator before it can start generating
+   *        workorders.
+   **/
+  BuildLIPFilterOperator(const std::size_t query_id,
+                         const CatalogRelation &input_relation,
+                         const QueryContext::predicate_id build_side_predicate_index,
+                         const bool input_relation_is_stored)
+    : RelationalOperator(query_id),
+      input_relation_(input_relation),
+      build_side_predicate_index_(build_side_predicate_index),
+      input_relation_is_stored_(input_relation_is_stored),
+      input_relation_block_ids_(input_relation_is_stored ? input_relation.getBlocksSnapshot()
+                                                         : std::vector<block_id>()),
+      num_workorders_generated_(0),
+      started_(false) {}
+
+  ~BuildLIPFilterOperator() override {}
+
+  /**
+   * @return The input relation to build LIP filters on.
+   */
+  const CatalogRelation& input_relation() const {
+    return input_relation_;
+  }
+
+  std::string getName() const override {
+    return "BuildLIPFilterOperator";
+  }
+
+  bool getAllWorkOrders(WorkOrdersContainer *container,
+                        QueryContext *query_context,
+                        StorageManager *storage_manager,
+                        const tmb::client_id scheduler_client_id,
+                        tmb::MessageBus *bus) override;
+
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
+  void feedInputBlock(const block_id input_block_id,
+                      const relation_id input_relation_id,
+                      const partition_id part_id) override {
+    input_relation_block_ids_.push_back(input_block_id);
+  }
+
+ private:
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @param block The block id used in the Work Order.
+   **/
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
+  const CatalogRelation &input_relation_;
+  const QueryContext::predicate_id build_side_predicate_index_;
+  const bool input_relation_is_stored_;
+
+  std::vector<block_id> input_relation_block_ids_;
+  std::vector<block_id>::size_type num_workorders_generated_;
+
+  bool started_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterOperator);
+};
+
+/**
+ * @brief A WorkOrder produced by BuildLIPFilterOperator.
+ **/
+class BuildLIPFilterWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this WorkOrder belongs.
+   * @param input_relation The relation to build LIP filters on.
+   * @param build_block_id The block id.
+   * @param build_side_predicate The predicate to be applied to filter the input
+   *        relation before building the LIP filters (or nullptr if no predicate
+   *        is to be applied).
+   * @param storage_manager The StorageManager to use.
+   * @param lip_filter_adaptive_prober The attached LIP filter prober.
+   * @param lip_filter_builder The attached LIP filter builder.
+   **/
+  BuildLIPFilterWorkOrder(const std::size_t query_id,
+                          const CatalogRelationSchema &input_relation,
+                          const block_id build_block_id,
+                          const Predicate *build_side_predicate,
+                          StorageManager *storage_manager,
+                          LIPFilterAdaptiveProber *lip_filter_adaptive_prober,
+                          LIPFilterBuilder *lip_filter_builder)
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
+        build_block_id_(build_block_id),
+        build_side_predicate_(build_side_predicate),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        lip_filter_adaptive_prober_(lip_filter_adaptive_prober),
+        lip_filter_builder_(DCHECK_NOTNULL(lip_filter_builder)) {}
+
+  ~BuildLIPFilterWorkOrder() override {}
+
+  /**
+   * @return The input relation to build LIP filters on.
+   */
+  const CatalogRelationSchema& input_relation() const {
+    return input_relation_;
+  }
+
+  void execute() override;
+
+ private:
+  const CatalogRelationSchema &input_relation_;
+  const block_id build_block_id_;
+  const Predicate *build_side_predicate_;
+
+  StorageManager *storage_manager_;
+
+  std::unique_ptr<LIPFilterAdaptiveProber> lip_filter_adaptive_prober_;
+  std::unique_ptr<LIPFilterBuilder> lip_filter_builder_;
+
+  DISALLOW_COPY_AND_ASSIGN(BuildLIPFilterWorkOrder);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_BUILD_LIP_FILTER_OPERATOR_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c8447f3..c18dc77 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -34,6 +34,7 @@ set_gflags_lib_name ()
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
 add_library(quickstep_relationaloperators_BuildHashOperator BuildHashOperator.cpp BuildHashOperator.hpp)
+add_library(quickstep_relationaloperators_BuildLIPFilterOperator BuildLIPFilterOperator.cpp BuildLIPFilterOperator.hpp)
 add_library(quickstep_relationaloperators_CreateIndexOperator CreateIndexOperator.cpp CreateIndexOperator.hpp)
 add_library(quickstep_relationaloperators_CreateTableOperator CreateTableOperator.cpp CreateTableOperator.hpp)
 add_library(quickstep_relationaloperators_DestroyAggregationStateOperator
@@ -113,6 +114,27 @@ target_link_libraries(quickstep_relationaloperators_BuildHashOperator
                       quickstep_utility_lipfilter_LIPFilterBuilder
                       quickstep_utility_lipfilter_LIPFilterUtil
                       tmb)
+target_link_libraries(quickstep_relationaloperators_BuildLIPFilterOperator
+                      glog
+                      quickstep_catalog_CatalogRelation
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_queryexecution_WorkOrderProtosContainer
+                      quickstep_queryexecution_WorkOrdersContainer
+                      quickstep_relationaloperators_RelationalOperator
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_StorageBlock
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageManager
+                      quickstep_storage_TupleIdSequence
+                      quickstep_storage_TupleStorageSubBlock
+                      quickstep_storage_ValueAccessor
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilterAdaptiveProber
+                      quickstep_utility_lipfilter_LIPFilterBuilder
+                      quickstep_utility_lipfilter_LIPFilterUtil
+                      tmb)
 target_link_libraries(quickstep_relationaloperators_CreateIndexOperator
                       glog
                       quickstep_catalog_CatalogRelation
@@ -483,6 +505,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_queryexecution_QueryContext
                       quickstep_relationaloperators_AggregationOperator
                       quickstep_relationaloperators_BuildHashOperator
+                      quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_DeleteOperator
                       quickstep_relationaloperators_DestroyAggregationStateOperator
                       quickstep_relationaloperators_DestroyHashOperator
@@ -515,6 +538,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
 target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_BuildHashOperator
                       quickstep_relationaloperators_CreateIndexOperator
                       quickstep_relationaloperators_CreateTableOperator

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index f8d9246..76753d2 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -24,25 +24,26 @@ import "relational_operators/SortMergeRunOperator.proto";
 enum WorkOrderType {
   AGGREGATION = 1;
   BUILD_HASH = 2;
-  CREATE_INDEX = 3;  // Placeholder.
-  CREATE_TABLE = 4;  // Placeholder.
-  DELETE = 5;
-  DESTROY_HASH = 6;
-  DROP_TABLE = 7;
-  FINALIZE_AGGREGATION = 8;
-  HASH_JOIN = 9;
-  INSERT = 10;
-  NESTED_LOOP_JOIN = 11;
-  SAMPLE = 12;
-  SAVE_BLOCKS = 13;
-  SELECT = 14;
-  SORT_MERGE_RUN = 15;
-  SORT_RUN_GENERATION = 16;
-  TABLE_GENERATOR = 17;
-  TEXT_SCAN = 18;
-  UPDATE = 19;
-  WINDOW_AGGREGATION = 20;
-  DESTROY_AGGREGATION_STATE = 21;
+  BUILD_LIP_FILTER = 3;
+  CREATE_INDEX = 4;  // Placeholder.
+  CREATE_TABLE = 5;  // Placeholder.
+  DELETE = 6;
+  DESTROY_HASH = 7;
+  DROP_TABLE = 8;
+  FINALIZE_AGGREGATION = 9;
+  HASH_JOIN = 10;
+  INSERT = 11;
+  NESTED_LOOP_JOIN = 12;
+  SAMPLE = 13;
+  SAVE_BLOCKS = 14;
+  SELECT = 15;
+  SORT_MERGE_RUN = 16;
+  SORT_RUN_GENERATION = 17;
+  TABLE_GENERATOR = 18;
+  TEXT_SCAN = 19;
+  UPDATE = 20;
+  WINDOW_AGGREGATION = 21;
+  DESTROY_AGGREGATION_STATE = 22;
 }
 
 message WorkOrder {
@@ -77,6 +78,16 @@ message BuildHashWorkOrder {
   }
 }
 
+message BuildLIPFilterWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 relation_id = 48;
+    optional fixed64 build_block_id = 49;
+    optional int32 build_side_predicate_index = 50;
+    optional int32 lip_deployment_index = 51;
+  }
+}
+
 message DeleteWorkOrder {
   extend WorkOrder {
     // All required.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index a6cba02..5e8d03d 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -30,6 +30,7 @@
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/AggregationOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
+#include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/DeleteOperator.hpp"
 #include "relational_operators/DestroyAggregationStateOperator.hpp"
 #include "relational_operators/DestroyHashOperator.hpp"
@@ -90,6 +91,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           CreateLIPFilterAdaptiveProberHelper(
               proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
     }
+    case serialization::BUILD_LIP_FILTER: {
+      LOG(INFO) << "Creating BuildLIPFilterWorkOrder in Shiftboss " << shiftboss_index;
+
+      const QueryContext::lip_deployment_id lip_deployment_index =
+          proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
+
+      return new BuildLIPFilterWorkOrder(
+          proto.query_id(),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)),
+          proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index)),
+          storage_manager,
+          CreateLIPFilterAdaptiveProberHelper(lip_deployment_index, query_context),
+          CreateLIPFilterBuilderHelper(lip_deployment_index, query_context));
+    }
     case serialization::BUILD_HASH: {
       LOG(INFO) << "Creating BuildHashWorkOrder in Shiftboss " << shiftboss_index;
       vector<attribute_id> join_key_attributes;
@@ -541,6 +559,33 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                  proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index),
                  proto.GetExtension(serialization::BuildHashWorkOrder::partition_id));
     }
+    case serialization::BUILD_LIP_FILTER: {
+      if (!proto.HasExtension(serialization::BuildLIPFilterWorkOrder::relation_id)) {
+        return false;
+      }
+
+      const relation_id rel_id =
+          proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id);
+      if (!catalog_database.hasRelationWithId(rel_id)) {
+        return false;
+      }
+
+      if (!proto.HasExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index)) {
+        return false;
+      } else {
+        const QueryContext::lip_deployment_id lip_deployment_index =
+            proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
+        if (lip_deployment_index != QueryContext::kInvalidLIPDeploymentId &&
+            !query_context.isValidLIPDeploymentId(lip_deployment_index)) {
+          return false;
+        }
+      }
+
+      return proto.HasExtension(serialization::BuildLIPFilterWorkOrder::build_block_id) &&
+             proto.HasExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_side_predicate_index));
+    }
     case serialization::DELETE: {
       return proto.HasExtension(serialization::DeleteWorkOrder::relation_id) &&
              catalog_database.hasRelationWithId(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index 8571149..aeff388 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -270,6 +270,7 @@ target_link_libraries(quickstep_utility_PlanVisualizer
                       quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/PlanVisualizer.cpp
----------------------------------------------------------------------
diff --git a/utility/PlanVisualizer.cpp b/utility/PlanVisualizer.cpp
index df7a20c..f8bf6f8 100644
--- a/utility/PlanVisualizer.cpp
+++ b/utility/PlanVisualizer.cpp
@@ -32,6 +32,7 @@
 #include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -58,6 +59,8 @@ std::string PlanVisualizer::visualize(const P::PhysicalPtr &input) {
 
   color_map_["TableReference"] = "skyblue";
   color_map_["Selection"] = "#90EE90";
+  color_map_["FilterJoin"] = "pink";
+  color_map_["FilterJoin(Anti)"] = "pink";
   color_map_["HashJoin"] = "red";
   color_map_["HashLeftOuterJoin"] = "orange";
   color_map_["HashLeftSemiJoin"] = "orange";
@@ -126,7 +129,8 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     edge_info.dst_node_id = node_id;
     edge_info.dashed = false;
 
-    if (input->getPhysicalType() == P::PhysicalType::kHashJoin &&
+    if ((input->getPhysicalType() == P::PhysicalType::kHashJoin ||
+         input->getPhysicalType() == P::PhysicalType::kFilterJoin) &&
         child == input->children()[1]) {
       edge_info.dashed = true;
     }
@@ -165,6 +169,20 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
       }
       break;
     }
+    case P::PhysicalType::kFilterJoin: {
+      const P::FilterJoinPtr filter_join =
+          std::static_pointer_cast<const P::FilterJoin>(input);
+      node_info.labels.emplace_back(input->getName());
+
+      const auto &probe_attributes = filter_join->probe_attributes();
+      const auto &build_attributes = filter_join->build_attributes();
+      for (std::size_t i = 0; i < probe_attributes.size(); ++i) {
+        node_info.labels.emplace_back(
+            probe_attributes[i]->attribute_alias() + " = " +
+                build_attributes[i]->attribute_alias());
+      }
+      break;
+    }
     default: {
       node_info.labels.emplace_back(input->getName());
       break;
@@ -177,7 +195,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     if (build_it != build_filters.end()) {
       for (const auto &build_info : build_it->second) {
         node_info.labels.emplace_back(
-            std::string("[LIP build] ") + build_info.build_attribute->attribute_alias());
+            std::string("[LIP build] ") + build_info->build_attribute()->attribute_alias());
       }
     }
     const auto &probe_filters = lip_filter_conf_->getProbeInfoMap();
@@ -185,7 +203,7 @@ void PlanVisualizer::visit(const P::PhysicalPtr &input) {
     if (probe_it != probe_filters.end()) {
       for (const auto &probe_info : probe_it->second) {
         node_info.labels.emplace_back(
-            std::string("[LIP probe] ") + probe_info.probe_attribute->attribute_alias());
+            std::string("[LIP probe] ") + probe_info->probe_attribute()->attribute_alias());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/BitVectorExactFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/BitVectorExactFilter.hpp b/utility/lip_filter/BitVectorExactFilter.hpp
new file mode 100644
index 0000000..6ad0567
--- /dev/null
+++ b/utility/lip_filter/BitVectorExactFilter.hpp
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
+#define QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_
+
+#include <atomic>
+#include <cstdint>
+#include <cstring>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "storage/StorageConstants.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/Type.hpp"
+#include "utility/Macros.hpp"
+#include "utility/lip_filter/LIPFilter.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief A LIP filter that tests the EXACT memberships of elements, i.e. there
+ *        will be neither false positives nor false negatives. The implementation
+ *        is to simply reinterpret_cast a value's byte stream into the specified
+ *        CppType as the underlying bit vector's index. Therefore, to use this
+ *        filter, the corresponding LIP attribute's values must be bounded in a
+ *        reasonably small integer range.
+ */
+template <typename CppType, bool is_anti_filter>
+class BitVectorExactFilter : public LIPFilter {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param min_value The minimum possible value for this filter to set.
+   * @param max_value The maximum possible value for this filter to set.
+   */
+  explicit BitVectorExactFilter(const std::int64_t min_value,
+                                const std::int64_t max_value)
+      : LIPFilter(LIPFilterType::kBitVectorExactFilter),
+        min_value_(static_cast<CppType>(min_value)),
+        max_value_(static_cast<CppType>(max_value)),
+        bit_array_(GetByteSize(max_value - min_value + 1)) {
+    DCHECK_EQ(min_value, static_cast<std::int64_t>(min_value_));
+    DCHECK_EQ(max_value, static_cast<std::int64_t>(max_value_));
+    DCHECK_GE(max_value_, min_value_);
+
+    std::memset(bit_array_.data(),
+                0x0,
+                sizeof(std::atomic<std::uint8_t>) * GetByteSize(max_value - min_value + 1));
+  }
+
+  void insertValueAccessor(ValueAccessor *accessor,
+                           const attribute_id attr_id,
+                           const Type *attr_type) override {
+    InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+      if (attr_type->isNullable()) {
+        this->insertValueAccessorInternal<true>(accessor, attr_id);
+      } else {
+        this->insertValueAccessorInternal<false>(accessor, attr_id);
+      }
+    });
+  }
+
+  std::size_t filterBatch(ValueAccessor *accessor,
+                          const attribute_id attr_id,
+                          const bool is_attr_nullable,
+                          std::vector<tuple_id> *batch,
+                          const std::size_t batch_size) const override {
+    DCHECK(batch != nullptr);
+    DCHECK_LE(batch_size, batch->size());
+
+    return InvokeOnAnyValueAccessor(
+        accessor,
+        [&](auto *accessor) -> std::size_t {  // NOLINT(build/c++11)
+      if (is_attr_nullable) {
+        return this->filterBatchInternal<true>(accessor, attr_id, batch, batch_size);
+      } else {
+        return this->filterBatchInternal<false>(accessor, attr_id, batch, batch_size);
+      }
+    });
+  }
+
+ private:
+  /**
+   * @brief Round up bit_size to multiples of 8.
+   */
+  inline static std::size_t GetByteSize(const std::size_t bit_size) {
+    return (bit_size + 7u) / 8u;
+  }
+
+  /**
+   * @brief Iterate through the accessor and hash values into the internal bit
+   *        array.
+   */
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline void insertValueAccessorInternal(ValueAccessorT *accessor,
+                                          const attribute_id attr_id) {
+    accessor->beginIteration();
+    while (accessor->next()) {
+      const void *value = accessor->template getUntypedValue<is_attr_nullable>(attr_id);
+      if (!is_attr_nullable || value != nullptr) {
+        insert(value);
+      }
+    }
+  }
+
+  /**
+   * @brief Filter the given batch of tuples from a ValueAccessor. Write the
+   *        tuple ids which survive in the filtering back to \p batch.
+   */
+  template <bool is_attr_nullable, typename ValueAccessorT>
+  inline std::size_t filterBatchInternal(const ValueAccessorT *accessor,
+                                         const attribute_id attr_id,
+                                         std::vector<tuple_id> *batch,
+                                         const std::size_t batch_size) const {
+    std::size_t out_size = 0;
+    for (std::size_t i = 0; i < batch_size; ++i) {
+      const tuple_id tid = batch->at(i);
+      const void *value =
+          accessor->template getUntypedValueAtAbsolutePosition(attr_id, tid);
+      if (is_attr_nullable && value == nullptr) {
+        continue;
+      }
+      if (contains(value)) {
+        batch->at(out_size) = tid;
+        ++out_size;
+      }
+    }
+    return out_size;
+  }
+
+  /**
+   * @brief Inserts a given value into the exact filter.
+   */
+  inline void insert(const void *key_begin) {
+    const CppType value = *reinterpret_cast<const CppType *>(key_begin);
+    DCHECK_GE(value, min_value_);
+    DCHECK_LE(value, max_value_);
+
+    const CppType loc = value - min_value_;
+    bit_array_[loc >> 3u].fetch_or(1u << (loc & 7u), std::memory_order_relaxed);
+  }
+
+  /**
+   * @brief Test membership of a given value in the exact filter.
+   */
+  inline bool contains(const void *key_begin) const {
+    const CppType value = *reinterpret_cast<const CppType *>(key_begin);
+    if (value < min_value_ || value > max_value_) {
+      return is_anti_filter;
+    }
+
+    const CppType loc = value - min_value_;
+    const bool is_bit_set =
+        (bit_array_[loc >> 3u].load(std::memory_order_relaxed) & (1u << (loc & 7u))) != 0;
+
+    if (is_anti_filter) {
+      return !is_bit_set;
+    } else {
+      return is_bit_set;
+    }
+  }
+
+  const CppType min_value_;
+  const CppType max_value_;
+  alignas(kCacheLineBytes) std::vector<std::atomic<std::uint8_t>> bit_array_;
+
+  DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilter);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_LIP_FILTER_BIT_VECTOR_EXACT_FILTER_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utility/lip_filter/CMakeLists.txt b/utility/lip_filter/CMakeLists.txt
index 23b3763..edd0d24 100644
--- a/utility/lip_filter/CMakeLists.txt
+++ b/utility/lip_filter/CMakeLists.txt
@@ -20,6 +20,7 @@ QS_PROTOBUF_GENERATE_CPP(utility_lipfilter_LIPFilter_proto_srcs
                          LIPFilter.proto)
 
 # Declare micro-libs:
+add_library(quickstep_utility_lipfilter_BitVectorExactFilter ../../empty_src.cpp BitVectorExactFilter.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilter ../../empty_src.cpp LIPFilter.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterAdaptiveProber ../../empty_src.cpp LIPFilterAdaptiveProber.hpp)
 add_library(quickstep_utility_lipfilter_LIPFilterBuilder ../../empty_src.cpp LIPFilterBuilder.hpp)
@@ -31,6 +32,15 @@ add_library(quickstep_utility_lipfilter_LIPFilter_proto
 add_library(quickstep_utility_lipfilter_SingleIdentityHashFilter ../../empty_src.cpp SingleIdentityHashFilter.hpp)
 
 # Link dependencies:
+target_link_libraries(quickstep_utility_lipfilter_BitVectorExactFilter
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_StorageConstants
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_utility_lipfilter_LIPFilter
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilter
                       quickstep_catalog_CatalogTypedefs
                       quickstep_storage_StorageBlockInfo
@@ -56,6 +66,7 @@ target_link_libraries(quickstep_utility_lipfilter_LIPFilterDeployment
                       quickstep_utility_lipfilter_LIPFilterBuilder
                       quickstep_utility_lipfilter_LIPFilter_proto)
 target_link_libraries(quickstep_utility_lipfilter_LIPFilterFactory
+                      quickstep_utility_lipfilter_BitVectorExactFilter
                       quickstep_utility_lipfilter_LIPFilter_proto
                       quickstep_utility_lipfilter_SingleIdentityHashFilter
                       quickstep_utility_Macros)

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilter.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.hpp b/utility/lip_filter/LIPFilter.hpp
index 682d69f..ba38264 100644
--- a/utility/lip_filter/LIPFilter.hpp
+++ b/utility/lip_filter/LIPFilter.hpp
@@ -37,8 +37,8 @@ class ValueAccessor;
  */
 
 enum class LIPFilterType {
+  kBitVectorExactFilter,
   kBloomFilter,
-  kExactFilter,
   kSingleIdentityHashFilter
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilter.proto
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilter.proto b/utility/lip_filter/LIPFilter.proto
index def13dd..45843f3 100644
--- a/utility/lip_filter/LIPFilter.proto
+++ b/utility/lip_filter/LIPFilter.proto
@@ -22,8 +22,8 @@ package quickstep.serialization;
 import "types/Type.proto";
 
 enum LIPFilterType {
-  BLOOM_FILTER = 1;
-  EXACT_FILTER = 2;
+  BIT_VECTOR_EXACT_FILTER = 1;
+  BLOOM_FILTER = 2;
   SINGLE_IDENTITY_HASH_FILTER = 3;
 }
 
@@ -33,17 +33,22 @@ message LIPFilter {
   extensions 16 to max;
 }
 
-message SingleIdentityHashFilter {
+message BitVectorExactFilter {
   extend LIPFilter {
     // All required
-    optional uint64 filter_cardinality = 16;
-    optional uint64 attribute_size = 17;
+    optional int64 min_value = 16;
+    optional int64 max_value = 17;
+    optional uint64 attribute_size = 18;
+    optional bool is_anti_filter = 19;
   }
 }
 
-enum LIPFilterActionType {
-  BUILD = 1;
-  PROBE = 2;
+message SingleIdentityHashFilter {
+  extend LIPFilter {
+    // All required
+    optional uint64 filter_cardinality = 24;
+    optional uint64 attribute_size = 25;
+  }
 }
 
 message LIPFilterDeployment {
@@ -53,6 +58,6 @@ message LIPFilterDeployment {
     required Type attribute_type = 3;
   }
 
-  required LIPFilterActionType action_type = 1;
-  repeated Entry entries = 2;
+  repeated Entry build_entries = 1;
+  repeated Entry probe_entries = 2;
 }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterDeployment.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.cpp b/utility/lip_filter/LIPFilterDeployment.cpp
index cd4d90f..5721496 100644
--- a/utility/lip_filter/LIPFilterDeployment.cpp
+++ b/utility/lip_filter/LIPFilterDeployment.cpp
@@ -28,45 +28,49 @@
 #include "utility/lip_filter/LIPFilterBuilder.hpp"
 #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
-#include "glog/logging.h"
-
 namespace quickstep {
 
 LIPFilterDeployment::LIPFilterDeployment(
     const serialization::LIPFilterDeployment &proto,
     const std::vector<std::unique_ptr<LIPFilter>> &lip_filters) {
-  switch (proto.action_type()) {
-    case serialization::LIPFilterActionType::BUILD:
-      action_type_ = LIPFilterActionType::kBuild;
-      break;
-    case serialization::LIPFilterActionType::PROBE:
-      action_type_ = LIPFilterActionType::kProbe;
-      break;
-    default:
-      LOG(FATAL) << "Unsupported LIPFilterActionType: "
-                 << serialization::LIPFilterActionType_Name(proto.action_type());
+  if (proto.build_entries_size() > 0) {
+    build_.reset(new LIPFilterDeploymentInfo());
+    for (int i = 0; i < proto.build_entries_size(); ++i) {
+      const auto &entry_proto = proto.build_entries(i);
+      build_->lip_filters_.emplace_back(
+          lip_filters.at(entry_proto.lip_filter_id()).get());
+      build_->attr_ids_.emplace_back(entry_proto.attribute_id());
+      build_->attr_types_.emplace_back(
+          &TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+    }
   }
 
-  for (int i = 0; i < proto.entries_size(); ++i) {
-    const auto &entry_proto = proto.entries(i);
-    lip_filters_.emplace_back(lip_filters.at(entry_proto.lip_filter_id()).get());
-    attr_ids_.emplace_back(entry_proto.attribute_id());
-    attr_types_.emplace_back(&TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+  if (proto.probe_entries_size() > 0) {
+    probe_.reset(new LIPFilterDeploymentInfo());
+    for (int i = 0; i < proto.probe_entries_size(); ++i) {
+      const auto &entry_proto = proto.probe_entries(i);
+      probe_->lip_filters_.emplace_back(
+          lip_filters.at(entry_proto.lip_filter_id()).get());
+      probe_->attr_ids_.emplace_back(entry_proto.attribute_id());
+      probe_->attr_types_.emplace_back(
+          &TypeFactory::ReconstructFromProto(entry_proto.attribute_type()));
+    }
   }
 }
 
 bool LIPFilterDeployment::ProtoIsValid(
     const serialization::LIPFilterDeployment &proto) {
-  if (proto.action_type() != serialization::LIPFilterActionType::BUILD &&
-      proto.action_type() != serialization::LIPFilterActionType::PROBE) {
-    LOG(FATAL) << "Unsupported LIPFilterActionType: "
-               << serialization::LIPFilterActionType_Name(proto.action_type());
-  }
-  if (proto.entries_size() == 0) {
+  if (proto.build_entries_size() == 0 && proto.probe_entries_size() == 0) {
     return false;
   }
-  for (int i = 0; i < proto.entries_size(); ++i) {
-    const auto &entry_proto = proto.entries(i);
+  for (int i = 0; i < proto.build_entries_size(); ++i) {
+    const auto &entry_proto = proto.build_entries(i);
+    if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
+      return false;
+    }
+  }
+  for (int i = 0; i < proto.probe_entries_size(); ++i) {
+    const auto &entry_proto = proto.probe_entries(i);
     if (!TypeFactory::ProtoIsValid(entry_proto.attribute_type())) {
       return false;
     }
@@ -75,13 +79,23 @@ bool LIPFilterDeployment::ProtoIsValid(
 }
 
 LIPFilterBuilder* LIPFilterDeployment::createLIPFilterBuilder() const {
-  DCHECK(action_type_ == LIPFilterActionType::kBuild);
-  return new LIPFilterBuilder(lip_filters_, attr_ids_, attr_types_);
+  if (build_ == nullptr) {
+    return nullptr;
+  }
+
+  return new LIPFilterBuilder(build_->lip_filters_,
+                              build_->attr_ids_,
+                              build_->attr_types_);
 }
 
 LIPFilterAdaptiveProber* LIPFilterDeployment::createLIPFilterAdaptiveProber() const {
-  DCHECK(action_type_ == LIPFilterActionType::kProbe);
-  return new LIPFilterAdaptiveProber(lip_filters_, attr_ids_, attr_types_);
+  if (probe_ == nullptr) {
+    return nullptr;
+  }
+
+  return new LIPFilterAdaptiveProber(probe_->lip_filters_,
+                                     probe_->attr_ids_,
+                                     probe_->attr_types_);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterDeployment.hpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterDeployment.hpp b/utility/lip_filter/LIPFilterDeployment.hpp
index 9b37f88..ab1259b 100644
--- a/utility/lip_filter/LIPFilterDeployment.hpp
+++ b/utility/lip_filter/LIPFilterDeployment.hpp
@@ -39,11 +39,6 @@ class Type;
  *  @{
  */
 
-enum class LIPFilterActionType {
-  kBuild = 0,
-  kProbe
-};
-
 /**
  * @brief Helper class for organizing a group of LIPFilters in the backend.
  *        Each LIPFilterDeployment object is attached to a RelationalOperator.
@@ -69,16 +64,6 @@ class LIPFilterDeployment {
   static bool ProtoIsValid(const serialization::LIPFilterDeployment &proto);
 
   /**
-   * @brief Get the action type for this group of LIPFilters (i.e. whether
-   *        to build or probe the filters).
-   *
-   * @return The action type.
-   */
-  LIPFilterActionType getActionType() const {
-    return action_type_;
-  }
-
-  /**
    * @brief Create a LIPFilterBuilder for this group of LIPFilters.
    *
    * @return A new LIPFilterBuilder object for this group of LIPFilters.
@@ -95,11 +80,14 @@ class LIPFilterDeployment {
   LIPFilterAdaptiveProber* createLIPFilterAdaptiveProber() const;
 
  private:
-  LIPFilterActionType action_type_;
-
-  std::vector<LIPFilter *> lip_filters_;
-  std::vector<attribute_id> attr_ids_;
-  std::vector<const Type *> attr_types_;
+  struct LIPFilterDeploymentInfo {
+    std::vector<LIPFilter *> lip_filters_;
+    std::vector<attribute_id> attr_ids_;
+    std::vector<const Type *> attr_types_;
+  };
+
+  std::unique_ptr<LIPFilterDeploymentInfo> build_;
+  std::unique_ptr<LIPFilterDeploymentInfo> probe_;
 
   DISALLOW_COPY_AND_ASSIGN(LIPFilterDeployment);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/utility/lip_filter/LIPFilterFactory.cpp
----------------------------------------------------------------------
diff --git a/utility/lip_filter/LIPFilterFactory.cpp b/utility/lip_filter/LIPFilterFactory.cpp
index ebc4a0e..f69d8b0 100644
--- a/utility/lip_filter/LIPFilterFactory.cpp
+++ b/utility/lip_filter/LIPFilterFactory.cpp
@@ -23,6 +23,7 @@
 #include <cstdint>
 
 #include "utility/lip_filter/LIPFilter.pb.h"
+#include "utility/lip_filter/BitVectorExactFilter.hpp"
 #include "utility/lip_filter/SingleIdentityHashFilter.hpp"
 
 #include "glog/logging.h"
@@ -31,6 +32,46 @@ namespace quickstep {
 
 LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter &proto) {
   switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::BitVectorExactFilter::attribute_size);
+      const std::int64_t min_value =
+          proto.GetExtension(serialization::BitVectorExactFilter::min_value);
+      const std::int64_t max_value =
+          proto.GetExtension(serialization::BitVectorExactFilter::max_value);
+      const bool is_anti_filter =
+          proto.GetExtension(serialization::BitVectorExactFilter::is_anti_filter);
+
+      switch (attr_size) {
+        case 1:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::int8_t, true>(min_value, max_value);
+          } else {
+            return new BitVectorExactFilter<std::int8_t, false>(min_value, max_value);
+          }
+        case 2:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::int16_t, true>(min_value, max_value);
+          } else {
+            return new BitVectorExactFilter<std::int16_t, false>(min_value, max_value);
+          }
+        case 4:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::int32_t, true>(min_value, max_value);
+          } else {
+            return new BitVectorExactFilter<std::int32_t, false>(min_value, max_value);
+          }
+        case 8:
+          if (is_anti_filter) {
+            return new BitVectorExactFilter<std::int64_t, true>(min_value, max_value);
+          } else {
+            return new BitVectorExactFilter<std::int64_t, false>(min_value, max_value);
+          }
+        default:
+          LOG(FATAL) << "Invalid attribute size for BitVectorExactFilter: "
+                     << attr_size;
+      }
+    }
     case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
       const std::size_t attr_size =
           proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);
@@ -57,6 +98,15 @@ LIPFilter* LIPFilterFactory::ReconstructFromProto(const serialization::LIPFilter
 
 bool LIPFilterFactory::ProtoIsValid(const serialization::LIPFilter &proto) {
   switch (proto.lip_filter_type()) {
+    case serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER: {
+      const std::size_t attr_size =
+          proto.GetExtension(serialization::BitVectorExactFilter::attribute_size);
+      const std::int64_t min_value =
+          proto.GetExtension(serialization::BitVectorExactFilter::min_value);
+      const std::int64_t max_value =
+          proto.GetExtension(serialization::BitVectorExactFilter::max_value);
+      return (attr_size != 0 && max_value >= min_value);
+    }
     case serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER: {
       const std::size_t attr_size =
           proto.GetExtension(serialization::SingleIdentityHashFilter::attribute_size);


[2/2] incubator-quickstep git commit: Add BitVectorExactFilter as a LIP filter and supports Join-to-Semijoin transformation.

Posted by hb...@apache.org.
Add BitVectorExactFilter as a LIP filter and supports Join-to-Semijoin transformation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4ba819c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4ba819c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4ba819c5

Branch: refs/heads/master
Commit: 4ba819c5b82af1d9284525bd7a16784e0254be3f
Parents: 5ffdaaf
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Oct 27 14:16:32 2016 -0500
Committer: Jianqiao Zhu <ji...@cs.wisc.edu>
Committed: Tue Jan 31 16:57:09 2017 -0600

----------------------------------------------------------------------
 query_optimizer/CMakeLists.txt                  |   4 +
 query_optimizer/ExecutionGenerator.cpp          |  62 +++
 query_optimizer/ExecutionGenerator.hpp          |   8 +
 query_optimizer/LIPFilterGenerator.cpp          | 109 +++--
 query_optimizer/LIPFilterGenerator.hpp          |  49 ++-
 query_optimizer/PhysicalGenerator.cpp           |  19 +-
 query_optimizer/cost_model/CMakeLists.txt       |   5 +
 query_optimizer/cost_model/SimpleCostModel.cpp  |   9 +
 query_optimizer/cost_model/SimpleCostModel.hpp  |   5 +
 .../cost_model/StarSchemaSimpleCostModel.cpp    | 163 ++++++-
 .../cost_model/StarSchemaSimpleCostModel.hpp    |  83 ++++
 query_optimizer/expressions/ExpressionUtil.hpp  |   8 +-
 query_optimizer/physical/CMakeLists.txt         |  14 +
 query_optimizer/physical/FilterJoin.cpp         | 115 +++++
 query_optimizer/physical/FilterJoin.hpp         | 187 ++++++++
 .../physical/LIPFilterConfiguration.hpp         | 265 ++++++++---
 query_optimizer/physical/PatternMatcher.hpp     |   2 +
 query_optimizer/physical/PhysicalType.hpp       |   1 +
 query_optimizer/physical/TopLevelPlan.hpp       |   3 +-
 query_optimizer/rules/AttachLIPFilters.cpp      |  28 +-
 query_optimizer/rules/CMakeLists.txt            |  22 +
 query_optimizer/rules/InjectJoinFilters.cpp     | 438 +++++++++++++++++++
 query_optimizer/rules/InjectJoinFilters.hpp     | 116 +++++
 query_optimizer/tests/OptimizerTextTest.cpp     |   2 +
 relational_operators/BuildLIPFilterOperator.cpp | 154 +++++++
 relational_operators/BuildLIPFilterOperator.hpp | 200 +++++++++
 relational_operators/CMakeLists.txt             |  24 +
 relational_operators/WorkOrder.proto            |  49 ++-
 relational_operators/WorkOrderFactory.cpp       |  45 ++
 utility/CMakeLists.txt                          |   1 +
 utility/PlanVisualizer.cpp                      |  24 +-
 utility/lip_filter/BitVectorExactFilter.hpp     | 202 +++++++++
 utility/lip_filter/CMakeLists.txt               |  11 +
 utility/lip_filter/LIPFilter.hpp                |   2 +-
 utility/lip_filter/LIPFilter.proto              |  25 +-
 utility/lip_filter/LIPFilterDeployment.cpp      |  72 +--
 utility/lip_filter/LIPFilterDeployment.hpp      |  28 +-
 utility/lip_filter/LIPFilterFactory.cpp         |  50 +++
 38 files changed, 2394 insertions(+), 210 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 0ca971d..7f90e11 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -96,6 +96,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_InsertSelection
                       quickstep_queryoptimizer_physical_InsertTuple
@@ -115,6 +116,7 @@ target_link_libraries(quickstep_queryoptimizer_ExecutionGenerator
                       quickstep_queryoptimizer_physical_WindowAggregate
                       quickstep_relationaloperators_AggregationOperator
                       quickstep_relationaloperators_BuildHashOperator
+                      quickstep_relationaloperators_BuildLIPFilterOperator
                       quickstep_relationaloperators_CreateIndexOperator
                       quickstep_relationaloperators_CreateTableOperator
                       quickstep_relationaloperators_DeleteOperator
@@ -161,6 +163,7 @@ target_link_libraries(quickstep_queryoptimizer_LIPFilterGenerator
                       quickstep_queryoptimizer_QueryPlan
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_LIPFilterConfiguration
                       quickstep_queryoptimizer_physical_Physical
@@ -206,6 +209,7 @@ target_link_libraries(quickstep_queryoptimizer_PhysicalGenerator
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
                       quickstep_queryoptimizer_rules_ReorderColumns

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index e25b8ad..ce1452e 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -76,6 +76,7 @@
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/InsertSelection.hpp"
 #include "query_optimizer/physical/InsertTuple.hpp"
@@ -95,6 +96,7 @@
 #include "query_optimizer/physical/WindowAggregate.hpp"
 #include "relational_operators/AggregationOperator.hpp"
 #include "relational_operators/BuildHashOperator.hpp"
+#include "relational_operators/BuildLIPFilterOperator.hpp"
 #include "relational_operators/CreateIndexOperator.hpp"
 #include "relational_operators/CreateTableOperator.hpp"
 #include "relational_operators/DeleteOperator.hpp"
@@ -271,6 +273,9 @@ void ExecutionGenerator::generatePlanInternal(
     case P::PhysicalType::kDropTable:
       return convertDropTable(
           std::static_pointer_cast<const P::DropTable>(physical_plan));
+    case P::PhysicalType::kFilterJoin:
+      return convertFilterJoin(
+          std::static_pointer_cast<const P::FilterJoin>(physical_plan));
     case P::PhysicalType::kHashJoin:
       return convertHashJoin(
           std::static_pointer_cast<const P::HashJoin>(physical_plan));
@@ -608,6 +613,63 @@ void ExecutionGenerator::convertSharedSubplanReference(const physical::SharedSub
   }
 }
 
+void ExecutionGenerator::convertFilterJoin(const P::FilterJoinPtr &physical_plan) {
+  P::PhysicalPtr probe_physical = physical_plan->left();
+  P::PhysicalPtr build_physical = physical_plan->right();
+
+  // Let B denote the build side child. If B is also a FilterJoin, then the
+  // actual "concrete" input relation is B's probe side child, and B's build
+  // side becomes a LIPFilter that is attached to the BuildLIPFilterOperator
+  // created below.
+  P::FilterJoinPtr filter_join;
+  if (P::SomeFilterJoin::MatchesWithConditionalCast(build_physical, &filter_join)) {
+    build_physical = filter_join->left();
+    DCHECK(build_physical->getPhysicalType() != P::PhysicalType::kFilterJoin);
+  }
+
+  // Convert the predicate proto.
+  QueryContext::predicate_id build_side_predicate_index = QueryContext::kInvalidPredicateId;
+  if (physical_plan->build_side_filter_predicate()) {
+    build_side_predicate_index = query_context_proto_->predicates_size();
+
+    std::unique_ptr<const Predicate> build_side_predicate(
+        convertPredicate(physical_plan->build_side_filter_predicate()));
+    query_context_proto_->add_predicates()->CopyFrom(build_side_predicate->getProto());
+  }
+
+  const CatalogRelationInfo *probe_relation_info =
+      findRelationInfoOutputByPhysical(probe_physical);
+  const CatalogRelationInfo *build_relation_info =
+      findRelationInfoOutputByPhysical(build_physical);
+
+  // Create a BuildLIPFilterOperator for the FilterJoin. This operator builds
+  // LIP filters that are applied properly in downstream operators to achieve
+  // the filter-join semantics.
+  const QueryPlan::DAGNodeIndex build_filter_operator_index =
+      execution_plan_->addRelationalOperator(
+          new BuildLIPFilterOperator(
+              query_handle_->query_id(),
+              *build_relation_info->relation,
+              build_side_predicate_index,
+              build_relation_info->isStoredRelation()));
+
+  if (!build_relation_info->isStoredRelation()) {
+    execution_plan_->addDirectDependency(build_filter_operator_index,
+                                         build_relation_info->producer_operator_index,
+                                         false /* is_pipeline_breaker */);
+  }
+
+  physical_to_output_relation_map_.emplace(
+      std::piecewise_construct,
+      std::forward_as_tuple(physical_plan),
+      std::forward_as_tuple(probe_relation_info->producer_operator_index,
+                            probe_relation_info->relation));
+
+  DCHECK(lip_filter_generator_ != nullptr);
+  lip_filter_generator_->addFilterJoinInfo(physical_plan,
+                                           build_filter_operator_index);
+}
+
 void ExecutionGenerator::convertHashJoin(const P::HashJoinPtr &physical_plan) {
   // HashJoin is converted to three operators:
   //     BuildHash, HashJoin, DestroyHash. The second is the primary operator.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/ExecutionGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.hpp b/query_optimizer/ExecutionGenerator.hpp
index 55197c9..eba6eee 100644
--- a/query_optimizer/ExecutionGenerator.hpp
+++ b/query_optimizer/ExecutionGenerator.hpp
@@ -46,6 +46,7 @@
 #include "query_optimizer/physical/CreateTable.hpp"
 #include "query_optimizer/physical/DeleteTuples.hpp"
 #include "query_optimizer/physical/DropTable.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/InsertSelection.hpp"
 #include "query_optimizer/physical/InsertTuple.hpp"
@@ -248,6 +249,13 @@ class ExecutionGenerator {
   void convertSharedSubplanReference(const physical::SharedSubplanReferencePtr &physical_plan);
 
   /**
+   * @brief Converts a FilterJoin to a BuildLIPFilter operator.
+   *
+   * @param physical_plan The FilterJoin to be converted.
+   */
+  void convertFilterJoin(const physical::FilterJoinPtr &physical_plan);
+
+  /**
    * @brief Converts a HashJoin to BuildHash, HashJoin and
    *        DestroyHash operators.
    *

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/LIPFilterGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp
index 404037e..2ce2ea8 100644
--- a/query_optimizer/LIPFilterGenerator.cpp
+++ b/query_optimizer/LIPFilterGenerator.cpp
@@ -20,11 +20,13 @@
 #include "query_optimizer/LIPFilterGenerator.hpp"
 
 #include <map>
+#include <memory>
 #include <utility>
 #include <vector>
 
 #include "catalog/CatalogAttribute.hpp"
 #include "query_execution/QueryContext.pb.h"
+#include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "types/Type.hpp"
 #include "utility/lip_filter/LIPFilter.hpp"
@@ -47,7 +49,7 @@ void LIPFilterGenerator::registerAttributeMap(
   if (build_it != build_info_map.end()) {
     auto &map_entry = attribute_map_[node];
     for (const auto &info : build_it->second) {
-      E::ExprId attr_id = info.build_attribute->id();
+      E::ExprId attr_id = info->build_attribute()->id();
       map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id));
     }
   }
@@ -57,15 +59,16 @@ void LIPFilterGenerator::registerAttributeMap(
   if (probe_it != probe_info_map.end()) {
     auto &map_entry = attribute_map_[node];
     for (const auto &info : probe_it->second) {
-      E::ExprId attr_id = info.probe_attribute->id();
+      E::ExprId attr_id = info->probe_attribute()->id();
       map_entry.emplace(attr_id, attribute_substitution_map.at(attr_id));
     }
   }
 }
 
 void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
-                                          serialization::QueryContext *query_context_proto) const {
-  LIPFilterBuilderMap lip_filter_builder_map;
+                                          serialization::QueryContext *query_context_proto) {
+  lip_filter_builder_map_.clear();
+  lip_filter_deployment_protos_.clear();
 
   // Deploy builders
   const auto &build_info_map = lip_filter_configuration_->getBuildInfoMap();
@@ -76,8 +79,7 @@ void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
                             query_context_proto,
                             info.builder_node,
                             info.builder_operator_index,
-                            build_it->second,
-                            &lip_filter_builder_map);
+                            build_it->second);
     }
   }
 
@@ -90,10 +92,16 @@ void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
                           query_context_proto,
                           info.prober_node,
                           info.prober_operator_index,
-                          probe_it->second,
-                          lip_filter_builder_map);
+                          probe_it->second);
     }
   }
+
+  // Attach LIPFilterDeployment information to the RelationalOperators.
+  for (const auto &entry : lip_filter_deployment_protos_) {
+    RelationalOperator *relop =
+        execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(entry.first);
+    relop->deployLIPFilters(entry.second.first);
+  }
 }
 
 void LIPFilterGenerator::deployBuilderInternal(
@@ -101,30 +109,46 @@ void LIPFilterGenerator::deployBuilderInternal(
     serialization::QueryContext *query_context_proto,
     const physical::PhysicalPtr &builder_node,
     const QueryPlan::DAGNodeIndex builder_operator_index,
-    const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
-    LIPFilterBuilderMap *lip_filter_builder_map) const {
-  const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+    const std::vector<physical::LIPFilterBuildInfoPtr> &build_info_vec) {
   auto *lip_filter_deployment_info_proto =
-      query_context_proto->add_lip_filter_deployments();
-  lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::BUILD);
+      getLIPFilterDeploymentProto(builder_operator_index, query_context_proto);
 
   const auto &builder_attribute_map = attribute_map_.at(builder_node);
   for (const auto &info : build_info_vec) {
     // Add the LIPFilter information into query context.
     const QueryContext::lip_filter_id lip_filter_id = query_context_proto->lip_filters_size();
     serialization::LIPFilter *lip_filter_proto = query_context_proto->add_lip_filters();
-    const CatalogAttribute *target_attr = builder_attribute_map.at(info.build_attribute->id());
+    const CatalogAttribute *target_attr =
+        builder_attribute_map.at(info->build_attribute()->id());
     const Type &attr_type = target_attr->getType();
 
-    switch (info.filter_type) {
+    switch (info->filter_type()) {
       case LIPFilterType::kSingleIdentityHashFilter: {
         DCHECK(!attr_type.isVariableLength());
+        const P::SingleIdentityHashFilterBuildInfo &sihf_info =
+            *std::static_pointer_cast<const P::SingleIdentityHashFilterBuildInfo>(info);
         lip_filter_proto->set_lip_filter_type(
             serialization::LIPFilterType::SINGLE_IDENTITY_HASH_FILTER);
-        lip_filter_proto->SetExtension(
-            serialization::SingleIdentityHashFilter::filter_cardinality, info.filter_cardinality);
-        lip_filter_proto->SetExtension(
-            serialization::SingleIdentityHashFilter::attribute_size, attr_type.minimumByteLength());
+        lip_filter_proto->SetExtension(serialization::SingleIdentityHashFilter::filter_cardinality,
+                                       sihf_info.filter_cardinality());
+        lip_filter_proto->SetExtension(serialization::SingleIdentityHashFilter::attribute_size,
+                                       attr_type.minimumByteLength());
+        break;
+      }
+      case LIPFilterType::kBitVectorExactFilter: {
+        DCHECK(!attr_type.isVariableLength());
+        const P::BitVectorExactFilterBuildInfo &bvef_info =
+            *std::static_pointer_cast<const P::BitVectorExactFilterBuildInfo>(info);
+        lip_filter_proto->set_lip_filter_type(
+            serialization::LIPFilterType::BIT_VECTOR_EXACT_FILTER);
+        lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::min_value,
+                                       bvef_info.min_value());
+        lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::max_value,
+                                       bvef_info.max_value());
+        lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::attribute_size,
+                                       attr_type.minimumByteLength());
+        lip_filter_proto->SetExtension(serialization::BitVectorExactFilter::is_anti_filter,
+                                       bvef_info.is_anti_filter());
         break;
       }
       default:
@@ -133,21 +157,16 @@ void LIPFilterGenerator::deployBuilderInternal(
     }
 
     // Register the builder information which is needed later by the probers.
-    lip_filter_builder_map->emplace(
-        std::make_pair(info.build_attribute->id(), builder_node),
+    lip_filter_builder_map_.emplace(
+        std::make_pair(info->build_attribute()->id(), builder_node),
         std::make_pair(lip_filter_id, builder_operator_index));
 
     // Add the builder deployment information into query context.
-    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_build_entries();
     lip_filter_entry_proto->set_lip_filter_id(lip_filter_id);
     lip_filter_entry_proto->set_attribute_id(target_attr->getID());
     lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(attr_type.getProto());
   }
-
-  // Attach the LIPFilterDeployment information to the RelationalOperator.
-  RelationalOperator *relop =
-      execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(builder_operator_index);
-  relop->deployLIPFilters(lip_deployment_index);
 }
 
 void LIPFilterGenerator::deployProberInteral(
@@ -155,23 +174,21 @@ void LIPFilterGenerator::deployProberInteral(
     serialization::QueryContext *query_context_proto,
     const physical::PhysicalPtr &prober_node,
     const QueryPlan::DAGNodeIndex prober_operator_index,
-    const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
-    const LIPFilterBuilderMap &lip_filter_builder_map) const {
-  const auto lip_deployment_index = query_context_proto->lip_filter_deployments_size();
+    const std::vector<physical::LIPFilterProbeInfoPtr> &probe_info_vec) {
   auto *lip_filter_deployment_info_proto =
-      query_context_proto->add_lip_filter_deployments();
-  lip_filter_deployment_info_proto->set_action_type(serialization::LIPFilterActionType::PROBE);
+      getLIPFilterDeploymentProto(prober_operator_index, query_context_proto);
 
   const auto &prober_attribute_map = attribute_map_.at(prober_node);
   for (const auto &info : probe_info_vec) {
     // Find the corresponding builder for the to-be-probed LIPFilter.
     const auto &builder_info =
-        lip_filter_builder_map.at(
-            std::make_pair(info.build_attribute->id(), info.builder));
-    const CatalogAttribute *target_attr = prober_attribute_map.at(info.probe_attribute->id());
+        lip_filter_builder_map_.at(
+            std::make_pair(info->build_attribute()->id(), info->builder()));
+    const CatalogAttribute *target_attr =
+        prober_attribute_map.at(info->probe_attribute()->id());
 
     // Add the prober deployment information into query context.
-    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_entries();
+    auto *lip_filter_entry_proto = lip_filter_deployment_info_proto->add_probe_entries();
     lip_filter_entry_proto->set_lip_filter_id(builder_info.first);
     lip_filter_entry_proto->set_attribute_id(target_attr->getID());
     lip_filter_entry_proto->mutable_attribute_type()->CopyFrom(
@@ -183,11 +200,23 @@ void LIPFilterGenerator::deployProberInteral(
                                                  builder_info.second,
                                                  true /* is_pipeline_breaker */);
   }
+}
 
-  // Attach the LIPFilterDeployment information to the RelationalOperator.
-  RelationalOperator *relop =
-      execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(prober_operator_index);
-  relop->deployLIPFilters(lip_deployment_index);
+serialization::LIPFilterDeployment* LIPFilterGenerator::getLIPFilterDeploymentProto(
+    const QueryPlan::DAGNodeIndex op_index,
+    serialization::QueryContext *query_context_proto) {
+  const auto proto_it = lip_filter_deployment_protos_.find(op_index);
+  if (proto_it == lip_filter_deployment_protos_.end()) {
+    const int lip_deployment_index =
+        query_context_proto->lip_filter_deployments_size();
+    auto *lip_filter_deployment_proto =
+        query_context_proto->add_lip_filter_deployments();
+    lip_filter_deployment_protos_.emplace(
+        op_index, std::make_pair(lip_deployment_index, lip_filter_deployment_proto));
+    return lip_filter_deployment_proto;
+  } else {
+    return proto_it->second.second;
+  }
 }
 
 }  // namespace optimizer

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/LIPFilterGenerator.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.hpp b/query_optimizer/LIPFilterGenerator.hpp
index 9d191a1..750499d 100644
--- a/query_optimizer/LIPFilterGenerator.hpp
+++ b/query_optimizer/LIPFilterGenerator.hpp
@@ -30,6 +30,7 @@
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/physical/LIPFilterConfiguration.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/Selection.hpp"
@@ -39,7 +40,12 @@
 
 namespace quickstep {
 
-namespace serialization { class QueryContext; }
+namespace serialization {
+
+class QueryContext;
+class LIPFilterDeployment;
+
+}
 
 class CatalogAttribute;
 
@@ -93,6 +99,20 @@ class LIPFilterGenerator {
 
   /**
    * @brief Add physical-to-execution mapping information for deploying LIPFilters
+   *        to a FilterJoin node.
+   *
+   * @param filter_join A physical FilterJoin node.
+   * @param build_filter_operator_index The index of the BuildLIPFilterOperator
+   *        that corresponds to \p filter_join in the execution plan.
+   */
+  void addFilterJoinInfo(const physical::FilterJoinPtr &filter_join,
+                         const QueryPlan::DAGNodeIndex build_filter_operator_index) {
+    builder_infos_.emplace_back(filter_join, build_filter_operator_index);
+    prober_infos_.emplace_back(filter_join, build_filter_operator_index);
+  }
+
+  /**
+   * @brief Add physical-to-execution mapping information for deploying LIPFilters
    *        to a hash-join.
    *
    * @param hash_join A physical HashJoin node.
@@ -128,7 +148,7 @@ class LIPFilterGenerator {
    * @param query_context_proto QueryContext protobuf for the execution plan.
    */
   void deployLIPFilters(QueryPlan *execution_plan,
-                        serialization::QueryContext *query_context_proto) const;
+                        serialization::QueryContext *query_context_proto);
 
  private:
   /**
@@ -157,24 +177,21 @@ class LIPFilterGenerator {
     const QueryPlan::DAGNodeIndex prober_operator_index;
   };
 
-  // Maps each LIPFilter's building attribute to the LIPFilter's id in QueryContext
-  // as well as the LIPFilter's building relational operator's index.
-  typedef std::map<std::pair<expressions::ExprId, physical::PhysicalPtr>,
-                   std::pair<QueryContext::lip_filter_id, QueryPlan::DAGNodeIndex>> LIPFilterBuilderMap;
-
   void deployBuilderInternal(QueryPlan *execution_plan,
                              serialization::QueryContext *query_context_proto,
                              const physical::PhysicalPtr &builder_node,
                              const QueryPlan::DAGNodeIndex builder_operator_index,
-                             const std::vector<physical::LIPFilterBuildInfo> &build_info_vec,
-                             LIPFilterBuilderMap *lip_filter_builder_map) const;
+                             const std::vector<physical::LIPFilterBuildInfoPtr> &build_info_vec);
 
   void deployProberInteral(QueryPlan *execution_plan,
                            serialization::QueryContext *query_context_proto,
                            const physical::PhysicalPtr &prober_node,
                            const QueryPlan::DAGNodeIndex prober_operator_index,
-                           const std::vector<physical::LIPFilterProbeInfo> &probe_info_vec,
-                           const LIPFilterBuilderMap &lip_filter_builder_map) const;
+                           const std::vector<physical::LIPFilterProbeInfoPtr> &probe_info_vec);
+
+  serialization::LIPFilterDeployment* getLIPFilterDeploymentProto(
+      const QueryPlan::DAGNodeIndex op_index,
+      serialization::QueryContext *query_context_proto);
 
   const physical::LIPFilterConfigurationPtr lip_filter_configuration_;
 
@@ -183,6 +200,16 @@ class LIPFilterGenerator {
 
   std::map<physical::PhysicalPtr, std::map<expressions::ExprId, const CatalogAttribute *>> attribute_map_;
 
+  // Maps each LIPFilter's building attribute to the LIPFilter's id in QueryContext
+  // as well as the LIPFilter's building relational operator's index.
+  std::map<std::pair<expressions::ExprId, physical::PhysicalPtr>,
+           std::pair<QueryContext::lip_filter_id, QueryPlan::DAGNodeIndex>> lip_filter_builder_map_;
+
+  // Maps each relational operator's index to the attached LIPFilterDeployment's
+  // index and proto.
+  std::map<QueryPlan::DAGNodeIndex,
+           std::pair<int, serialization::LIPFilterDeployment *>> lip_filter_deployment_protos_;
+
   DISALLOW_COPY_AND_ASSIGN(LIPFilterGenerator);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/PhysicalGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index bd05267..5dc0ffb 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/InjectJoinFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
 #include "query_optimizer/rules/ReorderColumns.hpp"
@@ -56,6 +57,14 @@ DEFINE_bool(reorder_hash_joins, true,
             "cardinality and selective tables to be joined first, which is suitable "
             "for queries on star-schema tables.");
 
+DEFINE_bool(use_filter_joins, true,
+            "If true, apply an optimization that strength-reduces HashJoins to "
+            "FilterJoins (implemented as LIPFilters attached to some anchoring "
+            "operators. Briefly speaking, in the case that the join attribute has "
+            "consecutive integer values bounded in a reasonably small range, we "
+            "build a BitVector on the build-side attribute and use the BitVector "
+            "to filter the probe side table.");
+
 DEFINE_bool(use_lip_filters, true,
             "If true, use LIP (Lookahead Information Passing) filters to accelerate "
             "query processing. LIP filters are effective for queries on star schema "
@@ -133,9 +142,13 @@ P::PhysicalPtr PhysicalGenerator::optimizePlan() {
     rules.emplace_back(new ReorderColumns());
   }
 
-  // NOTE(jianqiao): Adding rules after AttachLIPFilters requires extra handling
-  // of LIPFilterConfiguration for transformed nodes. So currently it is suggested
-  // that all the new rules be placed before this point.
+  // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
+  // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
+  // suggested that all the new rules be placed before this point.
+  if (FLAGS_use_filter_joins) {
+    rules.emplace_back(new InjectJoinFilters());
+  }
+
   if (FLAGS_use_lip_filters) {
     rules.emplace_back(new AttachLIPFilters());
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/cost_model/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/CMakeLists.txt b/query_optimizer/cost_model/CMakeLists.txt
index 90133e7..5f28bb3 100644
--- a/query_optimizer/cost_model/CMakeLists.txt
+++ b/query_optimizer/cost_model/CMakeLists.txt
@@ -33,6 +33,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_SimpleCostModel
                       quickstep_catalog_CatalogRelationStatistics
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
                       quickstep_queryoptimizer_physical_Physical
@@ -49,6 +50,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       glog
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationStatistics
+                      quickstep_catalog_CatalogTypedefs
                       quickstep_queryoptimizer_costmodel_CostModel
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ComparisonExpression
@@ -60,6 +62,7 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_NestedLoopsJoin
                       quickstep_queryoptimizer_physical_PatternMatcher
@@ -72,6 +75,8 @@ target_link_libraries(quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostMod
                       quickstep_queryoptimizer_physical_TableReference
                       quickstep_queryoptimizer_physical_TopLevelPlan
                       quickstep_queryoptimizer_physical_WindowAggregate
+                      quickstep_types_NullType
+                      quickstep_types_TypedValue
                       quickstep_utility_Macros)
 
 # Module all-in-one library:

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/cost_model/SimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.cpp b/query_optimizer/cost_model/SimpleCostModel.cpp
index 7808898..e9d2e3a 100644
--- a/query_optimizer/cost_model/SimpleCostModel.cpp
+++ b/query_optimizer/cost_model/SimpleCostModel.cpp
@@ -27,6 +27,7 @@
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
@@ -61,6 +62,9 @@ std::size_t SimpleCostModel::estimateCardinality(
     case P::PhysicalType::kTableGenerator:
       return estimateCardinalityForTableGenerator(
           std::static_pointer_cast<const P::TableGenerator>(physical_plan));
+    case P::PhysicalType::kFilterJoin:
+      return estimateCardinalityForFilterJoin(
+          std::static_pointer_cast<const P::FilterJoin>(physical_plan));
     case P::PhysicalType::kHashJoin:
       return estimateCardinalityForHashJoin(
           std::static_pointer_cast<const P::HashJoin>(physical_plan));
@@ -119,6 +123,11 @@ std::size_t SimpleCostModel::estimateCardinalityForTableGenerator(
   return physical_plan->generator_function_handle()->getEstimatedCardinality();
 }
 
+std::size_t SimpleCostModel::estimateCardinalityForFilterJoin(
+    const P::FilterJoinPtr &physical_plan) {
+  return estimateCardinality(physical_plan->left());
+}
+
 std::size_t SimpleCostModel::estimateCardinalityForHashJoin(
     const P::HashJoinPtr &physical_plan) {
   return std::max(estimateCardinality(physical_plan->left()),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/cost_model/SimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/SimpleCostModel.hpp b/query_optimizer/cost_model/SimpleCostModel.hpp
index 16366cd..4edc2fe 100644
--- a/query_optimizer/cost_model/SimpleCostModel.hpp
+++ b/query_optimizer/cost_model/SimpleCostModel.hpp
@@ -26,6 +26,7 @@
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/Selection.hpp"
@@ -80,6 +81,10 @@ class SimpleCostModel : public CostModel {
   std::size_t estimateCardinalityForSort(
       const physical::SortPtr &physical_plan);
 
+  // Returns the left child's cardinality
+  std::size_t estimateCardinalityForFilterJoin(
+      const physical::FilterJoinPtr &physical_plan);
+
   // Returns the larger value of the estimated cardinalities of two
   // input plans.
   std::size_t estimateCardinalityForHashJoin(

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
index 75b1b2b..7afa1c3 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.cpp
@@ -21,11 +21,11 @@
 
 #include <algorithm>
 #include <memory>
-#include <unordered_map>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogRelationStatistics.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ComparisonExpression.hpp"
@@ -38,6 +38,7 @@
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/PatternMatcher.hpp"
 #include "query_optimizer/physical/Physical.hpp"
@@ -48,6 +49,8 @@
 #include "query_optimizer/physical/TableGenerator.hpp"
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/TypedValue.hpp"
+#include "types/NullType.hpp"
 
 #include "glog/logging.h"
 
@@ -73,6 +76,9 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinality(
     case P::PhysicalType::kTableGenerator:
       return estimateCardinalityForTableGenerator(
           std::static_pointer_cast<const P::TableGenerator>(physical_plan));
+    case P::PhysicalType::kFilterJoin:
+      return estimateCardinalityForFilterJoin(
+          std::static_pointer_cast<const P::FilterJoin>(physical_plan));
     case P::PhysicalType::kHashJoin:
       return estimateCardinalityForHashJoin(
           std::static_pointer_cast<const P::HashJoin>(physical_plan));
@@ -134,6 +140,17 @@ std::size_t StarSchemaSimpleCostModel::estimateCardinalityForTableGenerator(
   return physical_plan->generator_function_handle()->getEstimatedCardinality();
 }
 
+std::size_t StarSchemaSimpleCostModel::estimateCardinalityForFilterJoin(
+    const P::FilterJoinPtr &physical_plan) {
+  double build_side_filter_selectivity =
+      estimateSelectivityForPredicate(physical_plan->build_side_filter_predicate(),
+                                      physical_plan->right());
+  std::size_t left_cardinality = estimateCardinality(physical_plan->left());
+  double right_selectivity = estimateSelectivity(physical_plan->right());
+  return static_cast<std::size_t>(
+      left_cardinality * build_side_filter_selectivity * right_selectivity + 0.5);
+}
+
 std::size_t StarSchemaSimpleCostModel::estimateCardinalityForHashJoin(
     const P::HashJoinPtr &physical_plan) {
   std::size_t left_cardinality = estimateCardinality(physical_plan->left());
@@ -216,6 +233,18 @@ std::size_t StarSchemaSimpleCostModel::estimateNumDistinctValues(
       }
       break;
     }
+    case P::PhysicalType::kFilterJoin: {
+      const P::FilterJoinPtr &filter_join =
+          std::static_pointer_cast<const P::FilterJoin>(physical_plan);
+      if (E::ContainsExprId(filter_join->left()->getOutputAttributes(), attribute_id)) {
+        std::size_t left_child_num_distinct_values =
+            estimateNumDistinctValues(attribute_id, filter_join->left());
+        double right_child_selectivity =
+            estimateSelectivity(filter_join->right());
+        return static_cast<std::size_t>(
+            left_child_num_distinct_values * right_child_selectivity + 0.5);
+      }
+    }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
@@ -254,6 +283,16 @@ double StarSchemaSimpleCostModel::estimateSelectivity(
       double child_selectivity = estimateSelectivity(selection->input());
       return filter_selectivity * child_selectivity;
     }
+    case P::PhysicalType::kFilterJoin: {
+      const P::FilterJoinPtr &filter_join =
+          std::static_pointer_cast<const P::FilterJoin>(physical_plan);
+      double left_selectivity = estimateSelectivity(filter_join->left());
+      double right_selectivity = estimateSelectivity(filter_join->right());
+      double build_side_filter_selectivity =
+          estimateSelectivityForPredicate(filter_join->build_side_filter_predicate(),
+                                          filter_join->right());
+      return left_selectivity * right_selectivity * build_side_filter_selectivity;
+    }
     case P::PhysicalType::kHashJoin: {
       const P::HashJoinPtr &hash_join =
           std::static_pointer_cast<const P::HashJoin>(physical_plan);
@@ -383,18 +422,124 @@ double StarSchemaSimpleCostModel::estimateSelectivityForPredicate(
 std::size_t StarSchemaSimpleCostModel::getNumDistinctValues(
     const E::ExprId attribute_id,
     const P::TableReferencePtr &table_reference) {
-  const CatalogRelation &relation = *table_reference->relation();
-  const std::vector<E::AttributeReferencePtr> &attributes = table_reference->attribute_list();
-  for (std::size_t i = 0; i < attributes.size(); ++i) {
-    if (attributes[i]->id() == attribute_id) {
-      const CatalogRelationStatistics &stat = relation.getStatistics();
-      if (stat.hasNumDistinctValues(i)) {
-        return stat.getNumDistinctValues(i);
+  const auto rel_attr_id =
+      findCatalogRelationAttributeId(table_reference, attribute_id);
+  if (rel_attr_id != kInvalidAttributeID) {
+    const CatalogRelationStatistics &stat =
+        table_reference->relation()->getStatistics();
+    if (stat.hasNumDistinctValues(rel_attr_id)) {
+      return stat.getNumDistinctValues(rel_attr_id);
+    }
+  }
+  return estimateCardinalityForTableReference(table_reference);
+}
+
+bool StarSchemaSimpleCostModel::impliesUniqueAttributes(
+    const P::PhysicalPtr &physical_plan,
+    const std::vector<E::AttributeReferencePtr> &attributes) {
+  switch (physical_plan->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr &aggregate =
+          std::static_pointer_cast<const P::Aggregate>(physical_plan);
+      return E::SubsetOfExpressions(aggregate->grouping_expressions(), attributes);
+    }
+    case P::PhysicalType::kHashJoin: {
+      const P::HashJoinPtr &hash_join =
+          std::static_pointer_cast<const P::HashJoin>(physical_plan);
+      bool unique_from_left =
+          impliesUniqueAttributes(hash_join->right(), hash_join->right_join_attributes())
+              && impliesUniqueAttributes(hash_join->left(), attributes);
+      bool unique_from_right =
+          impliesUniqueAttributes(hash_join->left(), hash_join->left_join_attributes())
+              && impliesUniqueAttributes(hash_join->right(), attributes);
+      return unique_from_left || unique_from_right;
+    }
+    case P::PhysicalType::kTableReference: {
+      const P::TableReferencePtr &table_reference =
+          std::static_pointer_cast<const P::TableReference>(physical_plan);
+      const CatalogRelationStatistics &stat =
+          table_reference->relation()->getStatistics();
+      if (stat.hasNumTuples()) {
+        const std::size_t num_tuples = stat.getNumTuples();
+        for (const auto &attr : attributes) {
+          const attribute_id rel_attr_id =
+              findCatalogRelationAttributeId(table_reference, attr->id());
+          if (rel_attr_id != kInvalidAttributeID &&
+              stat.hasNumDistinctValues(rel_attr_id) &&
+              stat.getNumDistinctValues(rel_attr_id) == num_tuples) {
+            return true;
+          }
+        }
       }
+      return false;
+    }
+    case P::PhysicalType::kSample:  // Fall through
+    case P::PhysicalType::kSelection:
+    case P::PhysicalType::kSort: {
+      DCHECK_EQ(physical_plan->getNumChildren(), 1u);
+      return impliesUniqueAttributes(physical_plan->children()[0], attributes);
+    }
+    default:
       break;
+  }
+  return false;
+}
+
+TypedValue StarSchemaSimpleCostModel::findCatalogRelationStat(
+    const P::PhysicalPtr &physical_plan,
+    const E::ExprId attr_id,
+    const StatType stat_type,
+    bool *is_exact_stat) {
+  P::TableReferencePtr table_reference;
+  if (P::SomeTableReference::MatchesWithConditionalCast(physical_plan, &table_reference)) {
+    const attribute_id rel_attr_id =
+        findCatalogRelationAttributeId(table_reference, attr_id);
+    if (rel_attr_id != kInvalidAttributeID) {
+      const CatalogRelationStatistics &stat =
+          table_reference->relation()->getStatistics();
+
+      if (is_exact_stat != nullptr) {
+        *is_exact_stat = stat.isExact();
+      }
+
+      switch (stat_type) {
+        case StatType::kMin: {
+          if (stat.hasMinValue(rel_attr_id)) {
+            return stat.getMinValue(rel_attr_id);
+          }
+          break;
+        }
+        case StatType::kMax: {
+          if (stat.hasMaxValue(rel_attr_id)) {
+            return stat.getMaxValue(rel_attr_id);
+          }
+          break;
+        }
+        default:
+          break;
+      }
+      return NullType::InstanceNullable().makeNullValue();
     }
   }
-  return estimateCardinalityForTableReference(table_reference);
+
+  for (const auto &child : physical_plan->children()) {
+    if (E::ContainsExprId(child->getOutputAttributes(), attr_id)) {
+      return findCatalogRelationStat(child, attr_id, stat_type, is_exact_stat);
+    }
+  }
+  return NullType::InstanceNullable().makeNullValue();
+}
+
+attribute_id StarSchemaSimpleCostModel::findCatalogRelationAttributeId(
+    const physical::TableReferencePtr &table_reference,
+    const expressions::ExprId expr_id) {
+  const auto &attribute_list = table_reference->attribute_list();
+  for (std::size_t i = 0; i < attribute_list.size(); ++i) {
+    if (attribute_list[i]->id() == expr_id) {
+      return i;
+    }
+  }
+  return kInvalidAttributeID;
 }
 
 }  // namespace cost

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
index 6f6aa29..cbe18f4 100644
--- a/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
+++ b/query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp
@@ -23,11 +23,14 @@
 #include <cstddef>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_optimizer/cost_model/CostModel.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/physical/Aggregate.hpp"
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/FilterJoin.hpp"
 #include "query_optimizer/physical/HashJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/Selection.hpp"
@@ -36,6 +39,7 @@
 #include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/physical/TopLevelPlan.hpp"
 #include "query_optimizer/physical/WindowAggregate.hpp"
+#include "types/TypedValue.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -105,10 +109,70 @@ class StarSchemaSimpleCostModel : public CostModel {
   double estimateSelectivityForFilterPredicate(
       const physical::PhysicalPtr &physical_plan);
 
+  /**
+   * @brief Check whether a set of attributes are unique (i.e. have distinct
+   *        values) for a relation.
+   *
+   * @param physical_plan The physical plan that corresponds to a relation.
+   * @param attributes The set of attributes to be checked. Note that each
+   *        attribute in this set must be an output attribute of the physical
+   *        plan.
+   * @return True if it is guaranteed that the attributes are unique; false
+   *         otherwise.
+   */
+  bool impliesUniqueAttributes(
+      const physical::PhysicalPtr &physical_plan,
+      const std::vector<expressions::AttributeReferencePtr> &attributes);
+
+  /**
+   * @brief For a physical plan attribute, find its correponding catalog attribute's
+   *        MIN statistic. Returns Null value if there is no corresponding catalog
+   *        attribute for the physical plan attribute.
+   *
+   * @param physical_plan The physical plan.
+   * @param attribute The attribute. Must be an output attribute of the given
+   *        physical plan.
+   * @param is_exact_stat If this pointer is not null, its pointed content will
+   *        be modified by this method to indicate whether the returned statistic
+   *        is EXACT for the stored relation (i.e. not outdated or estimated).
+   * @return The MIN statistic for the attribute.
+   */
+  TypedValue findMinValueStat(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      bool *is_exact_stat = nullptr) {
+    return findCatalogRelationStat(
+        physical_plan, attribute->id(), StatType::kMin, is_exact_stat);
+  }
+
+  /**
+   * @brief For a physical plan attribute, find its correponding catalog attribute's
+   *        MAX statistic. Returns Null value if there is no corresponding catalog
+   *        attribute for the physical plan attribute.
+   *
+   * @param physical_plan The physical plan.
+   * @param attribute The attribute. Must be an output attribute of the given
+   *        physical plan.
+   * @param is_exact_stat If this pointer is not null, its pointed content will
+   *        be modified by this method to indicate whether the returned statistic
+   *        is EXACT for the stored relation (i.e. not not outdated or estimated).
+   * @return The MAX statistic for the attribute.
+   */
+  TypedValue findMaxValueStat(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::AttributeReferencePtr &attribute,
+      bool *is_exact_stat = nullptr) {
+    return findCatalogRelationStat(
+        physical_plan, attribute->id(), StatType::kMax, is_exact_stat);
+  }
+
  private:
   std::size_t estimateCardinalityForAggregate(
       const physical::AggregatePtr &physical_plan);
 
+  std::size_t estimateCardinalityForFilterJoin(
+      const physical::FilterJoinPtr &physical_plan);
+
   std::size_t estimateCardinalityForHashJoin(
       const physical::HashJoinPtr &physical_plan);
 
@@ -144,6 +208,25 @@ class StarSchemaSimpleCostModel : public CostModel {
   std::size_t getNumDistinctValues(const expressions::ExprId attribute_id,
                                    const physical::TableReferencePtr &table_reference);
 
+  enum class StatType {
+    kMax = 0,
+    kMin
+  };
+
+  // For a physical plan attribute, find its correponding catalog attribute's
+  // min/max statistics. Returns Null value if there is no corresponding catalog
+  // attribute for the physical plan attribute (e.g. the attribute is the result
+  // of an expression).
+  TypedValue findCatalogRelationStat(
+      const physical::PhysicalPtr &physical_plan,
+      const expressions::ExprId expr_id,
+      const StatType stat_type,
+      bool *is_exact_stat);
+
+  // For a table reference attribute, find its correponding catalog attribute.
+  attribute_id findCatalogRelationAttributeId(
+      const physical::TableReferencePtr &table_reference,
+      const expressions::ExprId expr_id);
 
   DISALLOW_COPY_AND_ASSIGN(StarSchemaSimpleCostModel);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/expressions/ExpressionUtil.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/expressions/ExpressionUtil.hpp b/query_optimizer/expressions/ExpressionUtil.hpp
index 422d5ab..6b8666e 100644
--- a/query_optimizer/expressions/ExpressionUtil.hpp
+++ b/query_optimizer/expressions/ExpressionUtil.hpp
@@ -122,12 +122,12 @@ bool ContainsExprId(
  *              contain the other operand).
  * @return True if \p left is a subset of \p right.
  */
-template <class NamedExpressionType>
+template <class LeftNamedExpressionType, class RightNamedExpressionType>
 bool SubsetOfExpressions(
-    const std::vector<std::shared_ptr<const NamedExpressionType>> &left,
-    const std::vector<std::shared_ptr<const NamedExpressionType>> &right) {
+    const std::vector<std::shared_ptr<const LeftNamedExpressionType>> &left,
+    const std::vector<std::shared_ptr<const RightNamedExpressionType>> &right) {
   UnorderedNamedExpressionSet supset(right.begin(), right.end());
-  for (const std::shared_ptr<const NamedExpressionType> &expr : left) {
+  for (const std::shared_ptr<const LeftNamedExpressionType> &expr : left) {
     if (supset.find(expr) == supset.end()) {
       return false;
     }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index 7f26943..f68ed39 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -23,6 +23,7 @@ add_library(quickstep_queryoptimizer_physical_CreateIndex CreateIndex.cpp Create
 add_library(quickstep_queryoptimizer_physical_CreateTable CreateTable.cpp CreateTable.hpp)
 add_library(quickstep_queryoptimizer_physical_DeleteTuples DeleteTuples.cpp DeleteTuples.hpp)
 add_library(quickstep_queryoptimizer_physical_DropTable DropTable.cpp DropTable.hpp)
+add_library(quickstep_queryoptimizer_physical_FilterJoin FilterJoin.cpp FilterJoin.hpp)
 add_library(quickstep_queryoptimizer_physical_HashJoin HashJoin.cpp HashJoin.hpp)
 add_library(quickstep_queryoptimizer_physical_InsertSelection InsertSelection.cpp InsertSelection.hpp)
 add_library(quickstep_queryoptimizer_physical_InsertTuple InsertTuple.cpp InsertTuple.hpp)
@@ -115,6 +116,18 @@ target_link_libraries(quickstep_queryoptimizer_physical_DropTable
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_physical_FilterJoin
+                      glog
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_BinaryJoin
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_utility_Cast
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_physical_HashJoin
                       glog
                       quickstep_queryoptimizer_OptimizerTree
@@ -282,6 +295,7 @@ target_link_libraries(quickstep_queryoptimizer_physical
                       quickstep_queryoptimizer_physical_CreateTable
                       quickstep_queryoptimizer_physical_DeleteTuples
                       quickstep_queryoptimizer_physical_DropTable
+                      quickstep_queryoptimizer_physical_FilterJoin
                       quickstep_queryoptimizer_physical_HashJoin
                       quickstep_queryoptimizer_physical_InsertSelection
                       quickstep_queryoptimizer_physical_InsertTuple

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/FilterJoin.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/FilterJoin.cpp b/query_optimizer/physical/FilterJoin.cpp
new file mode 100644
index 0000000..1817a1c
--- /dev/null
+++ b/query_optimizer/physical/FilterJoin.cpp
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/physical/FilterJoin.hpp"
+
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "utility/Cast.hpp"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+namespace E = ::quickstep::optimizer::expressions;
+
+std::vector<E::AttributeReferencePtr> FilterJoin::getReferencedAttributes() const {
+  std::vector<E::AttributeReferencePtr> referenced_attributes;
+  for (const auto &project_expression : project_expressions()) {
+    const auto referenced_attributes_in_expression =
+        project_expression->getReferencedAttributes();
+    referenced_attributes.insert(referenced_attributes.end(),
+                                 referenced_attributes_in_expression.begin(),
+                                 referenced_attributes_in_expression.end());
+  }
+  referenced_attributes.insert(referenced_attributes.end(),
+                               probe_attributes_.begin(),
+                               probe_attributes_.end());
+  referenced_attributes.insert(referenced_attributes.end(),
+                               build_attributes_.begin(),
+                               build_attributes_.end());
+  if (build_side_filter_predicate_ != nullptr) {
+    const auto referenced_attributes_in_predicate =
+        build_side_filter_predicate_->getReferencedAttributes();
+    referenced_attributes.insert(referenced_attributes.end(),
+                                 referenced_attributes_in_predicate.begin(),
+                                 referenced_attributes_in_predicate.end());
+  }
+  return referenced_attributes;
+}
+
+bool FilterJoin::maybeCopyWithPrunedExpressions(
+    const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+    PhysicalPtr *output) const {
+  std::vector<E::NamedExpressionPtr> new_project_expressions;
+  const auto &current_project_expressions = project_expressions();
+  for (const auto &project_expression : current_project_expressions) {
+    if (referenced_expressions.find(project_expression) != referenced_expressions.end()) {
+      new_project_expressions.emplace_back(project_expression);
+    }
+  }
+  if (new_project_expressions.size() != current_project_expressions.size()) {
+    *output = Create(left(),
+                     right(),
+                     probe_attributes_,
+                     build_attributes_,
+                     new_project_expressions,
+                     build_side_filter_predicate_,
+                     is_anti_join_);
+    return true;
+  }
+  return false;
+}
+
+void FilterJoin::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  BinaryJoin::getFieldStringItems(inline_field_names,
+                                  inline_field_values,
+                                  non_container_child_field_names,
+                                  non_container_child_fields,
+                                  container_child_field_names,
+                                  container_child_fields);
+
+  inline_field_names->push_back("is_anti_join");
+  inline_field_values->push_back(std::to_string(is_anti_join_));
+
+  if (build_side_filter_predicate_ != nullptr) {
+    non_container_child_field_names->emplace_back("build_side_filter_predicate");
+    non_container_child_fields->emplace_back(build_side_filter_predicate_);
+  }
+
+  container_child_field_names->push_back("probe_attributes");
+  container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(probe_attributes_));
+  container_child_field_names->push_back("build_attributes");
+  container_child_fields->push_back(CastSharedPtrVector<OptimizerTreeBase>(build_attributes_));
+}
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/FilterJoin.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/FilterJoin.hpp b/query_optimizer/physical/FilterJoin.hpp
new file mode 100644
index 0000000..ad4e18b
--- /dev/null
+++ b/query_optimizer/physical/FilterJoin.hpp
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_FILTER_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_FILTER_JOIN_HPP_
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/physical/BinaryJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace physical {
+
+/** \addtogroup OptimizerPhysical
+ *  @{
+ */
+
+class FilterJoin;
+typedef std::shared_ptr<const FilterJoin> FilterJoinPtr;
+
+/**
+ * @brief Physical filter join node. Semantically, FilterJoin is similar to
+ *        HashJoin where the difference is that FilterJoin builds a bit vector
+ *        instead of a hash table.
+ *
+ * @note FilterJoin's backend execution relies on LIPFilter injection (attach
+ *       the bit vectors as filters into downstream relational operators).
+ */
+class FilterJoin : public BinaryJoin {
+ public:
+  PhysicalType getPhysicalType() const override {
+    return PhysicalType::kFilterJoin;
+  }
+
+  std::string getName() const override {
+    if (is_anti_join_) {
+      return "FilterJoin(Anti)";
+    } else {
+      return "FilterJoin";
+    }
+  }
+
+  /**
+   * @return The probe side attributes.
+   */
+  const std::vector<expressions::AttributeReferencePtr>& probe_attributes() const {
+    return probe_attributes_;
+  }
+
+  /**
+   * @return The build side attributes.
+   */
+  const std::vector<expressions::AttributeReferencePtr>& build_attributes() const {
+    return build_attributes_;
+  }
+
+  /**
+   * @return The build side filter predicate.
+   */
+  const expressions::PredicatePtr& build_side_filter_predicate() const {
+    return build_side_filter_predicate_;
+  }
+
+  /**
+   * @return Whether this is an anti-join.
+   */
+  const bool is_anti_join() const {
+    return is_anti_join_;
+  }
+
+  PhysicalPtr copyWithNewChildren(
+      const std::vector<PhysicalPtr> &new_children) const override {
+    DCHECK_EQ(children().size(), new_children.size());
+    return Create(new_children[0],
+                  new_children[1],
+                  probe_attributes_,
+                  build_attributes_,
+                  project_expressions(),
+                  build_side_filter_predicate_,
+                  is_anti_join_);
+  }
+
+  std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
+
+  bool maybeCopyWithPrunedExpressions(
+      const expressions::UnorderedNamedExpressionSet &referenced_expressions,
+      PhysicalPtr *output) const override;
+
+  /**
+   * @brief Creates a physical FilterJoin.
+   * @param probe_child The probe side child plan.
+   * @param build_child The build side child plan.
+   * @param probe_attributes The probe side attributes.
+   * @param build_attributes The build side attributes.
+   * @param project_expressions The project expressions.
+   * @param build_side_filter_predicate Optional filtering predicate to be
+   *        applied to the build side child BEFORE join.
+   * @param is_anti_join Whether this is an anti-join.
+   * @return An immutable physical FilterJoin.
+   */
+  static FilterJoinPtr Create(
+      const PhysicalPtr &probe_child,
+      const PhysicalPtr &build_child,
+      const std::vector<expressions::AttributeReferencePtr> &probe_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &build_attributes,
+      const std::vector<expressions::NamedExpressionPtr> &project_expressions,
+      const expressions::PredicatePtr &build_side_filter_predicate,
+      const bool is_anti_join) {
+    return FilterJoinPtr(
+        new FilterJoin(probe_child,
+                       build_child,
+                       probe_attributes,
+                       build_attributes,
+                       project_expressions,
+                       build_side_filter_predicate,
+                       is_anti_join));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  FilterJoin(
+      const PhysicalPtr &probe_child,
+      const PhysicalPtr &build_child,
+      const std::vector<expressions::AttributeReferencePtr> &probe_attributes,
+      const std::vector<expressions::AttributeReferencePtr> &build_attributes,
+      const std::vector<expressions::NamedExpressionPtr> &project_expressions,
+      const expressions::PredicatePtr &build_side_filter_predicate,
+      const bool is_anti_join)
+      : BinaryJoin(probe_child, build_child, project_expressions),
+        probe_attributes_(probe_attributes),
+        build_attributes_(build_attributes),
+        build_side_filter_predicate_(build_side_filter_predicate),
+        is_anti_join_(is_anti_join) {
+  }
+
+  std::vector<expressions::AttributeReferencePtr> probe_attributes_;
+  std::vector<expressions::AttributeReferencePtr> build_attributes_;
+  expressions::PredicatePtr build_side_filter_predicate_;
+  bool is_anti_join_;
+
+  DISALLOW_COPY_AND_ASSIGN(FilterJoin);
+};
+
+/** @} */
+
+}  // namespace physical
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_FILTER_JOIN_HPP_

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/LIPFilterConfiguration.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/LIPFilterConfiguration.hpp b/query_optimizer/physical/LIPFilterConfiguration.hpp
index 62a6149..90c81fe 100644
--- a/query_optimizer/physical/LIPFilterConfiguration.hpp
+++ b/query_optimizer/physical/LIPFilterConfiguration.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_QUERY_OPTIMIZER_PHYSICAL_LIP_FILTER_CONFIGURATION_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <map>
 #include <memory>
 #include <vector>
@@ -40,50 +41,211 @@ namespace physical {
 class Physical;
 typedef std::shared_ptr<const Physical> PhysicalPtr;
 
+class LIPFilterBuildInfo;
+typedef std::shared_ptr<const LIPFilterBuildInfo> LIPFilterBuildInfoPtr;
+
+class LIPFilterProbeInfo;
+typedef std::shared_ptr<const LIPFilterProbeInfo> LIPFilterProbeInfoPtr;
+
 /**
  * @brief Optimizer information for a LIP filter builder.
  */
-struct LIPFilterBuildInfo {
+class LIPFilterBuildInfo {
+ public:
+  /**
+   * @return The LIPFilter's type.
+   */
+  LIPFilterType filter_type() const {
+    return filter_type_;
+  }
+
+  /**
+   * @return The LIPFilter's build attribute.
+   */
+  const expressions::AttributeReferencePtr& build_attribute() const {
+    return build_attribute_;
+  }
+
+ protected:
   /**
    * @brief Constructor.
    *
-   * @param build_attribute_in The attribute to build the LIP filter with.
-   * @param filter_cardinality_in The LIP filter's cardinality.
    * @param filter_type_in The LIP filter's type.
+   * @param build_attribute_in The attribute to build the LIP filter with.
+   */
+  LIPFilterBuildInfo(const LIPFilterType &filter_type,
+                     const expressions::AttributeReferencePtr &build_attribute)
+      : filter_type_(filter_type),
+        build_attribute_(build_attribute) {}
+
+ private:
+  const LIPFilterType filter_type_;
+  const expressions::AttributeReferencePtr build_attribute_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterBuildInfo);
+};
+
+/**
+ * @brief Subclass that contains extra information for SingleIdentityHashFilter
+ *        builder.
+ */
+class SingleIdentityHashFilterBuildInfo : public LIPFilterBuildInfo {
+ public:
+  /**
+   * @return The cardinality of this SingleIdentityHashFilter.
    */
-  LIPFilterBuildInfo(const expressions::AttributeReferencePtr &build_attribute_in,
-                     const std::size_t filter_cardinality_in,
-                     const LIPFilterType &filter_type_in)
-      : build_attribute(build_attribute_in),
-        filter_cardinality(filter_cardinality_in),
-        filter_type(filter_type_in) {
+  std::size_t filter_cardinality() const {
+    return filter_cardinality_;
   }
-  const expressions::AttributeReferencePtr build_attribute;
-  const std::size_t filter_cardinality;
-  const LIPFilterType filter_type;
+
+  /**
+   * @brief Creates a shared SingleIdentityHashFilterBuildInfo.
+   *
+   * @param build_attribute The attribute to build the filter with.
+   * @param filter_cardinality The cardinality of this SingleIdentityHashFilter.
+   */
+  static LIPFilterBuildInfoPtr Create(
+      const expressions::AttributeReferencePtr &build_attribute,
+      const std::size_t filter_cardinality) {
+    return LIPFilterBuildInfoPtr(
+        new SingleIdentityHashFilterBuildInfo(build_attribute,
+                                              filter_cardinality));
+  }
+
+ private:
+  SingleIdentityHashFilterBuildInfo(const expressions::AttributeReferencePtr &build_attribute,
+                                    const std::size_t filter_cardinality)
+      : LIPFilterBuildInfo(LIPFilterType::kSingleIdentityHashFilter,
+                           build_attribute),
+        filter_cardinality_(filter_cardinality) {}
+
+  const std::size_t filter_cardinality_;
+
+  DISALLOW_COPY_AND_ASSIGN(SingleIdentityHashFilterBuildInfo);
 };
 
 /**
+ * @brief Subclass that contains extra information for BitVectorExactFilter
+ *        builder.
+ */
+class BitVectorExactFilterBuildInfo : public LIPFilterBuildInfo {
+ public:
+  /**
+   * @return The minimum possible value for this BitVectorExactFilter.
+   */
+  std::int64_t min_value() const {
+    return min_value_;
+  }
+
+  /**
+   * @return The maximum possible value for this BitVectorExactFilter.
+   */
+  std::int64_t max_value() const {
+    return max_value_;
+  }
+
+  /**
+   * @return Whether this is an anti-filter.
+   */
+  bool is_anti_filter() const {
+    return is_anti_filter_;
+  }
+
+  /**
+   * @brief Creates a shared BitVectorExactFilterBuildInfo.
+   *
+   * @param build_attribute The attribute to build the filter with.
+   * @param min_value The minimum possible value for this BitVectorExactFilter
+   *        to set.
+   * @param max_value The maximum possible value for this BitVectorExactFilter
+   *        to set.
+   * @param is_anti_filter Whether this is an anti-filter.
+   */
+  static LIPFilterBuildInfoPtr Create(
+      const expressions::AttributeReferencePtr &build_attribute,
+      const std::int64_t min_value,
+      const std::int64_t max_value,
+      const bool is_anti_filter) {
+    return LIPFilterBuildInfoPtr(
+        new BitVectorExactFilterBuildInfo(build_attribute,
+                                          min_value,
+                                          max_value,
+                                          is_anti_filter));
+  }
+
+ private:
+  BitVectorExactFilterBuildInfo(const expressions::AttributeReferencePtr &build_attribute,
+                                const std::int64_t min_value,
+                                const std::int64_t max_value,
+                                const bool is_anti_filter)
+      : LIPFilterBuildInfo(LIPFilterType::kBitVectorExactFilter,
+                           build_attribute),
+        min_value_(min_value),
+        max_value_(max_value),
+        is_anti_filter_(is_anti_filter) {}
+
+  const std::int64_t min_value_;
+  const std::int64_t max_value_;
+  const bool is_anti_filter_;
+
+  DISALLOW_COPY_AND_ASSIGN(BitVectorExactFilterBuildInfo);
+};
+
+
+/**
  * @brief Optimizer information for a LIP filter prober.
  */
-struct LIPFilterProbeInfo {
+class LIPFilterProbeInfo {
+ public:
   /**
-   * @brief Constructor.
+   * @return The attribute to probe the LIP Filter with.
+   */
+  const expressions::AttributeReferencePtr& probe_attribute() const {
+    return probe_attribute_;
+  }
+
+  /**
+   * @return The attribute that the LIP filter is built with.
+   */
+  const expressions::AttributeReferencePtr& build_attribute() const {
+    return build_attribute_;
+  }
+
+  /**
+   * @return The physical node that the LIP filter's builder is attached to.
+   */
+  const PhysicalPtr& builder() const {
+    return builder_;
+  }
+
+  /**
+   * @brief Creates a shared LIPFilterProbeInfo.
    *
-   * @param probe_attribute_in The attribute to probe the LIP filter with.
-   * @param build_attribute_in The attribute that the LIP filter is built with.
-   * @param builder_in The physical node that the LIP filter's builder is attached to.
-   */
-  LIPFilterProbeInfo(const expressions::AttributeReferencePtr &probe_attribute_in,
-                     const expressions::AttributeReferencePtr &build_attribute_in,
-                     const PhysicalPtr &builder_in)
-      : probe_attribute(probe_attribute_in),
-        build_attribute(build_attribute_in),
-        builder(builder_in) {
-  }
-  const expressions::AttributeReferencePtr probe_attribute;
-  const expressions::AttributeReferencePtr build_attribute;
-  const PhysicalPtr builder;
+   * @param probe_attribute The attribute to probe the LIP filter with.
+   * @param build_attribute The attribute that the LIP filter is built with.
+   * @param builder The physical node that the LIP filter's builder is attached to.
+   */
+  static LIPFilterProbeInfoPtr Create(
+      const expressions::AttributeReferencePtr &probe_attribute,
+      const expressions::AttributeReferencePtr &build_attribute,
+      const PhysicalPtr &builder) {
+    return LIPFilterProbeInfoPtr(
+        new LIPFilterProbeInfo(probe_attribute, build_attribute, builder));
+  }
+
+ private:
+  LIPFilterProbeInfo(const expressions::AttributeReferencePtr &probe_attribute,
+                     const expressions::AttributeReferencePtr &build_attribute,
+                     const PhysicalPtr &builder)
+      : probe_attribute_(probe_attribute),
+        build_attribute_(build_attribute),
+        builder_(builder) {}
+
+  const expressions::AttributeReferencePtr probe_attribute_;
+  const expressions::AttributeReferencePtr build_attribute_;
+  const PhysicalPtr builder_;
+
+  DISALLOW_COPY_AND_ASSIGN(LIPFilterProbeInfo);
 };
 
 
@@ -104,33 +266,23 @@ class LIPFilterConfiguration {
   /**
    * @brief Add information for a LIP filter builder.
    *
-   * @param build_attribute The attribute to build the LIP filter with.
+   * @param build_info A shared_ptr to LIPFilterBuildInfo.
    * @param builder The physical node to attach the LIP filter builder to.
-   * @param filter_size The LIP filter's cardinality.
-   * @param filter_type The LIP filter's type.
    */
-  void addBuildInfo(const expressions::AttributeReferencePtr &build_attribute,
-                    const PhysicalPtr &builder,
-                    const std::size_t filter_size,
-                    const LIPFilterType &filter_type) {
-    build_info_map_[builder].emplace_back(
-        build_attribute, filter_size, filter_type);
+  void addBuildInfo(const LIPFilterBuildInfoPtr &build_info,
+                    const PhysicalPtr &builder) {
+    build_info_map_[builder].emplace_back(build_info);
   }
 
   /**
    * @brief Add information for a LIP filter prober.
    *
-   * @param probe_attribute The attribute to probe the LIP filter with.
+   * @param probe_info A shared_ptr to LIPFilterProbeInfo.
    * @param prober The physical node to attach the LIP filter prober to.
-   * @param build_attribute The attribute that the LIP filter is built with.
-   * @param builder The physical node that the LIP filter's builder is attached to.
    */
-  void addProbeInfo(const expressions::AttributeReferencePtr &probe_attribute,
-                    const PhysicalPtr &prober,
-                    const expressions::AttributeReferencePtr &build_attribute,
-                    const PhysicalPtr &builder) {
-    probe_info_map_[prober].emplace_back(
-        probe_attribute, build_attribute, builder);
+  void addProbeInfo(const LIPFilterProbeInfoPtr &probe_info,
+                    const PhysicalPtr &prober) {
+    probe_info_map_[prober].emplace_back(probe_info);
   }
 
   /**
@@ -140,7 +292,7 @@ class LIPFilterConfiguration {
    *         a vector of all the LIP filter builders that are attached to the
    *         physical node.
    */
-  const std::map<PhysicalPtr, std::vector<LIPFilterBuildInfo>>& getBuildInfoMap() const {
+  const std::map<PhysicalPtr, std::vector<LIPFilterBuildInfoPtr>>& getBuildInfoMap() const {
     return build_info_map_;
   }
 
@@ -151,13 +303,26 @@ class LIPFilterConfiguration {
    *         a vector of all the LIP filter probers that are attached to the
    *         physical node.
    */
-  const std::map<PhysicalPtr, std::vector<LIPFilterProbeInfo>>& getProbeInfoMap() const {
+  const std::map<PhysicalPtr, std::vector<LIPFilterProbeInfoPtr>>& getProbeInfoMap() const {
     return probe_info_map_;
   }
 
+  /**
+   * @brief Clone a copy of this configuration.
+   *
+   * @return A copy of this confiugration. Caller should take ownership of the
+   *         returned object.
+   */
+  LIPFilterConfiguration* clone() const {
+    LIPFilterConfiguration *new_conf = new LIPFilterConfiguration();
+    new_conf->build_info_map_ = build_info_map_;
+    new_conf->probe_info_map_ = probe_info_map_;
+    return new_conf;
+  }
+
  private:
-  std::map<PhysicalPtr, std::vector<LIPFilterBuildInfo>> build_info_map_;
-  std::map<PhysicalPtr, std::vector<LIPFilterProbeInfo>> probe_info_map_;
+  std::map<PhysicalPtr, std::vector<LIPFilterBuildInfoPtr>> build_info_map_;
+  std::map<PhysicalPtr, std::vector<LIPFilterProbeInfoPtr>> probe_info_map_;
 
   DISALLOW_COPY_AND_ASSIGN(LIPFilterConfiguration);
 };

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/PatternMatcher.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PatternMatcher.hpp b/query_optimizer/physical/PatternMatcher.hpp
index 5cd6fd3..4336767 100644
--- a/query_optimizer/physical/PatternMatcher.hpp
+++ b/query_optimizer/physical/PatternMatcher.hpp
@@ -35,6 +35,7 @@ class CopyFrom;
 class CreateTable;
 class DeleteTuples;
 class DropTable;
+class FilterJoin;
 class HashJoin;
 class InsertTuple;
 class Join;
@@ -113,6 +114,7 @@ using SomeCopyFrom = SomePhysicalNode<CopyFrom, PhysicalType::kCopyFrom>;
 using SomeCreateTable = SomePhysicalNode<CreateTable, PhysicalType::kCreateTable>;
 using SomeDeleteTuples = SomePhysicalNode<DeleteTuples, PhysicalType::kDeleteTuples>;
 using SomeDropTable = SomePhysicalNode<DropTable, PhysicalType::kDropTable>;
+using SomeFilterJoin = SomePhysicalNode<FilterJoin, PhysicalType::kFilterJoin>;
 using SomeHashJoin = SomePhysicalNode<HashJoin, PhysicalType::kHashJoin>;
 using SomeInsertTuple = SomePhysicalNode<InsertTuple, PhysicalType::kInsertTuple>;
 using SomeJoin = SomePhysicalNode<Join, PhysicalType::kHashJoin, PhysicalType::kNestedLoopsJoin>;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/PhysicalType.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/PhysicalType.hpp b/query_optimizer/physical/PhysicalType.hpp
index f5f35a1..1da5929 100644
--- a/query_optimizer/physical/PhysicalType.hpp
+++ b/query_optimizer/physical/PhysicalType.hpp
@@ -38,6 +38,7 @@ enum class PhysicalType {
   kCreateTable,
   kDeleteTuples,
   kDropTable,
+  kFilterJoin,
   kHashJoin,
   kInsertSelection,
   kInsertTuple,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/physical/TopLevelPlan.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/physical/TopLevelPlan.hpp b/query_optimizer/physical/TopLevelPlan.hpp
index 7dfc2b6..9e567e1 100644
--- a/query_optimizer/physical/TopLevelPlan.hpp
+++ b/query_optimizer/physical/TopLevelPlan.hpp
@@ -126,7 +126,8 @@ class TopLevelPlan : public Physical {
     }
     return TopLevelPlan::Create(new_children[0],
                                 new_shared_subplans,
-                                uncorrelated_subquery_map_);
+                                uncorrelated_subquery_map_,
+                                lip_filter_configuration_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/AttachLIPFilters.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/AttachLIPFilters.cpp b/query_optimizer/rules/AttachLIPFilters.cpp
index b3c57ab..48b68bc 100644
--- a/query_optimizer/rules/AttachLIPFilters.cpp
+++ b/query_optimizer/rules/AttachLIPFilters.cpp
@@ -55,7 +55,14 @@ P::PhysicalPtr AttachLIPFilters::apply(const P::PhysicalPtr &input) {
   cost_model_.reset(
       new cost::StarSchemaSimpleCostModel(
           top_level_plan->shared_subplans()));
-  lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
+
+  const P::LIPFilterConfigurationPtr &existing_configuration =
+      top_level_plan->lip_filter_configuration();
+  if (existing_configuration != nullptr) {
+    lip_filter_configuration_.reset(existing_configuration->clone());
+  } else {
+    lip_filter_configuration_.reset(new P::LIPFilterConfiguration());
+  }
 
   std::set<E::ExprId> already_filtered_attributes;
   attachLIPFilters(NodeList(input), &already_filtered_attributes);
@@ -101,7 +108,7 @@ void AttachLIPFilters::attachLIPFilters(
   }
 
   if (probe_child != nullptr &&
-      cost_model_->estimateCardinality(probe_child) > 10000000) {
+      cost_model_->estimateCardinality(probe_child) >= 100000) {
     const auto &candidate_lip_filters = getProbeSideInfo(path.cons(probe_child));
     if (!candidate_lip_filters.empty()) {
       std::map<E::AttributeReferencePtr, LIPFilterInfoPtr> selected_filters;
@@ -119,15 +126,16 @@ void AttachLIPFilters::attachLIPFilters(
         if (already_filtered_attributes->find(source_attr_id)
                 == already_filtered_attributes->end()) {
           lip_filter_configuration_->addBuildInfo(
-              pair.second->source_attribute,
-              pair.second->source,
-              pair.second->estimated_cardinality * 8,
-              LIPFilterType::kSingleIdentityHashFilter);
-          lip_filter_configuration_->addProbeInfo(
-              pair.first,
-              node,
-              pair.second->source_attribute,
+              P::SingleIdentityHashFilterBuildInfo::Create(
+                  pair.second->source_attribute,
+                  pair.second->estimated_cardinality * 8),
               pair.second->source);
+          lip_filter_configuration_->addProbeInfo(
+              P::LIPFilterProbeInfo::Create(
+                  pair.first,
+                  pair.second->source_attribute,
+                  pair.second->source),
+              node);
           already_filtered_attributes->emplace(source_attr_id);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4ba819c5/query_optimizer/rules/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 86d1ef7..223c78c 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -22,6 +22,7 @@ add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
+add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
@@ -196,6 +197,26 @@ target_link_libraries(quickstep_queryoptimizer_rules_SwapProbeBuild
 target_link_libraries(quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_InjectJoinFilters
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_FilterJoin
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_LIPFilterConfiguration
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_queryoptimizer_rules_PruneColumns
+                      quickstep_types_TypeID
+                      quickstep_types_TypedValue
+                      quickstep_utility_Macros
+                      quickstep_utility_lipfilter_LIPFilter)
 target_link_libraries(quickstep_queryoptimizer_rules_UnnestSubqueries
                       quickstep_queryoptimizer_OptimizerContext
                       quickstep_queryoptimizer_expressions_AttributeReference
@@ -246,6 +267,7 @@ target_link_libraries(quickstep_queryoptimizer_rules
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins
+                      quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownFilter
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate