You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/06/13 21:13:46 UTC

incubator-quickstep git commit: QUICKSTEP-76: Enabled LIP in the distributed version.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 2a622460b -> d0c55320f


QUICKSTEP-76: Enabled LIP in the distributed version.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0c55320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0c55320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0c55320

Branch: refs/heads/master
Commit: d0c55320f476141b2747dc4048735ca8acebda34
Parents: 2a62246
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri May 5 17:53:09 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Jun 13 15:23:40 2017 -0500

----------------------------------------------------------------------
 cli/distributed/QuickstepDistributedCli.cpp     |  5 --
 query_execution/CMakeLists.txt                  |  1 +
 query_execution/ForemanDistributed.cpp          | 56 ++++++++++++--
 query_execution/ForemanDistributed.hpp          |  4 +
 query_execution/PolicyEnforcerDistributed.cpp   | 20 +++++
 query_execution/PolicyEnforcerDistributed.hpp   | 35 ++++++++-
 query_execution/QueryManagerDistributed.cpp     | 61 +++++++++++++++
 query_execution/QueryManagerDistributed.hpp     | 81 ++++++++++++++++----
 query_optimizer/LIPFilterGenerator.cpp          | 17 +++-
 .../tests/DistributedExecutionGeneratorTest.cpp |  5 --
 relational_operators/AggregationOperator.cpp    |  4 +
 relational_operators/BuildHashOperator.cpp      |  4 +
 relational_operators/BuildLIPFilterOperator.cpp |  4 +
 relational_operators/HashJoinOperator.cpp       |  8 ++
 relational_operators/RelationalOperator.hpp     |  6 +-
 relational_operators/SelectOperator.cpp         |  4 +
 relational_operators/WorkOrder.proto            |  9 ++-
 17 files changed, 285 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
index 08443cd..513bedd 100644
--- a/cli/distributed/QuickstepDistributedCli.cpp
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -69,11 +69,6 @@ using quickstep::FLAGS_role;
 int main(int argc, char *argv[]) {
   google::InitGoogleLogging(argv[0]);
 
-  // TODO(quickstep-team): Fix JIRA QUICKSTEP-76 for adding LIP filter support
-  // in the distributed version.
-  quickstep::optimizer::FLAGS_use_lip_filters = false;
-  quickstep::optimizer::FLAGS_use_filter_joins = false;
-
   gflags::ParseCommandLineFlags(&argc, &argv, true);
   grpc_init();
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index c74fa36..4c3b52a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -281,6 +281,7 @@ if (ENABLE_DISTRIBUTED)
                         quickstep_storage_StorageBlockInfo
                         quickstep_utility_DAG
                         quickstep_utility_Macros
+                        quickstep_utility_lipfilter_LIPFilter_proto
                         tmb)
 endif(ENABLE_DISTRIBUTED)
 target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index e5e0eee..fbac18e 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,11 +243,17 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
                                                        size_t *shiftboss_index_for_aggregation) {
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::aggregation_state_id aggr_state_index;
+  vector<QueryContext::lip_filter_id> lip_filter_indexes;
   block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::AGGREGATION:
       aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+
+      for (int i = 0; i < work_order_proto.ExtensionSize(S::AggregationWorkOrder::lip_filter_indexes); ++i) {
+        lip_filter_indexes.push_back(work_order_proto.GetExtension(S::AggregationWorkOrder::lip_filter_indexes, i));
+      }
+
       block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
       break;
     case S::FINALIZE_AGGREGATION:
@@ -261,8 +267,8 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
-      proto.query_id(), aggr_state_index, block_locator_, block, next_shiftboss_index_to_schedule,
-      shiftboss_index_for_aggregation);
+      proto.query_id(), aggr_state_index, lip_filter_indexes, block_locator_, block,
+      next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
 
   return true;
 }
@@ -273,12 +279,18 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   const S::WorkOrder &work_order_proto = proto.work_order();
   QueryContext::join_hash_table_id join_hash_table_index;
   partition_id part_id;
+  vector<QueryContext::lip_filter_id> lip_filter_indexes;
   block_id block = kInvalidBlockId;
 
   switch (work_order_proto.work_order_type()) {
     case S::BUILD_HASH:
       join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
       part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id);
+
+      for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildHashWorkOrder::lip_filter_indexes); ++i) {
+        lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildHashWorkOrder::lip_filter_indexes, i));
+      }
+
       block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id);
       break;
     case S::HASH_JOIN:
@@ -294,8 +306,39 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
   }
 
   static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
-      proto.query_id(), join_hash_table_index, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
-      shiftboss_index_for_hash_join);
+      proto.query_id(), join_hash_table_index, part_id, lip_filter_indexes, block_locator_, block,
+      next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join);
+
+  return true;
+}
+
+bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
+                                               const size_t next_shiftboss_index_to_schedule,
+                                               size_t *shiftboss_index_for_lip) {
+  const S::WorkOrder &work_order_proto = proto.work_order();
+  vector<QueryContext::lip_filter_id> lip_filter_indexes;
+  block_id block = kInvalidBlockId;
+
+  switch (work_order_proto.work_order_type()) {
+    case S::BUILD_LIP_FILTER:
+      for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildLIPFilterWorkOrder::lip_filter_indexes); ++i) {
+        lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::lip_filter_indexes, i));
+      }
+      block = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::build_block_id);
+      break;
+    case S::SELECT:
+      for (int i = 0; i < work_order_proto.ExtensionSize(S::SelectWorkOrder::lip_filter_indexes); ++i) {
+        lip_filter_indexes.push_back(work_order_proto.GetExtension(S::SelectWorkOrder::lip_filter_indexes, i));
+      }
+      block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+      break;
+    default:
+      return false;
+  }
+
+  static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForLip(
+      proto.query_id(), lip_filter_indexes, block_locator_, block, next_shiftboss_index_to_schedule,
+      shiftboss_index_for_lip);
 
   return true;
 }
@@ -329,10 +372,6 @@ bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
       block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
       break;
     }
-    case S::SELECT: {
-      block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
-      break;
-    }
     case S::SORT_RUN_GENERATION: {
       block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
       break;
@@ -359,6 +398,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
     if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
       // Always schedule the single-node query to the same Shiftboss.
       shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+    } else if (isLipRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
     } else if (hasBlockLocalityInfo(work_order_proto, block_locator_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index b975428..543d83f 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -98,6 +98,10 @@ class ForemanDistributed final : public ForemanBase {
                                   const std::size_t next_shiftboss_index_to_schedule,
                                   std::size_t *shiftboss_index_for_hash_join);
 
+  bool isLipRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+                             const std::size_t next_shiftboss_index_to_schedule,
+                             std::size_t *shiftboss_index_for_lip);
+
   /**
    * @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
    *        worker threads.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index b410152..0a4fd30 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,6 +192,7 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
 void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
     const std::size_t query_id,
     const QueryContext::aggregation_state_id aggr_state_index,
+    const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
     const BlockLocator &block_locator,
     const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
@@ -199,6 +200,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
   DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+                                                 lip_filter_indexes,
                                                  block_locator,
                                                  block,
                                                  next_shiftboss_index_to_schedule,
@@ -209,6 +211,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
     const std::size_t query_id,
     const QueryContext::join_hash_table_id join_hash_table_index,
     const partition_id part_id,
+    const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
     const BlockLocator &block_locator,
     const block_id block,
     const std::size_t next_shiftboss_index_to_schedule,
@@ -217,12 +220,29 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
   QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
   query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
                                               part_id,
+                                              lip_filter_indexes,
                                               block_locator,
                                               block,
                                               next_shiftboss_index_to_schedule,
                                               shiftboss_index);
 }
 
+void PolicyEnforcerDistributed::getShiftbossIndexForLip(
+    const std::size_t query_id,
+    const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+    const BlockLocator &block_locator,
+    const block_id block,
+    const std::size_t next_shiftboss_index_to_schedule,
+    std::size_t *shiftboss_index) {
+  DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+  QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+  query_manager->getShiftbossIndexForLip(lip_filter_indexes,
+                                         block_locator,
+                                         block,
+                                         next_shiftboss_index_to_schedule,
+                                         shiftboss_index);
+}
+
 void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
   S::QueryInitiateMessage proto;
   proto.set_query_id(query_handle->query_id());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index cd3a434..f44fd2e 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -126,13 +126,14 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   /**
    * @brief Get or set the index of Shiftboss for an Aggregation related
    * WorkOrder. If it is the first Aggregation on <aggr_state_index>,
-   * <shiftboss_index> will be set based on block locality if found,
-   * otherwise <next_shiftboss_index_to_schedule>.
+   * <shiftboss_index> will be set based on <lip_filter_indexes> locality or
+   * block locality if found, otherwise <next_shiftboss_index_to_schedule>.
    * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
    * has executed the first Aggregation.
    *
    * @param query_id The query id.
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -141,6 +142,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   void getShiftbossIndexForAggregation(
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
+      const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
       const BlockLocator &block_locator,
       const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,
@@ -149,14 +151,15 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
   /**
    * @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
    * If it is the first BuildHash on <join_hash_table_index, part_id>,
-   * <shiftboss_index> will be set to block locality if found,
-   * otherwise <next_shiftboss_index_to_schedule>.
+   * <shiftboss_index> will be set based on <lip_filter_indexes> locality or
+   * block locality if found, otherwise <next_shiftboss_index_to_schedule>.
    * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
    * has executed the first BuildHash.
    *
    * @param query_id The query id.
    * @param join_hash_table_index The Hash Table for the Join.
    * @param part_id The partition ID.
+   * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -166,6 +169,30 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
       const std::size_t query_id,
       const QueryContext::join_hash_table_id join_hash_table_index,
       const partition_id part_id,
+      const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+      const BlockLocator &block_locator,
+      const block_id block,
+      const std::size_t next_shiftboss_index_to_schedule,
+      std::size_t *shiftboss_index);
+
+  /**
+   * @brief Get or set the index of Shiftboss for a LIP related WorkOrder.
+   * If it is the first WorkOrder on <lip_filter_indexes>,
+   * <shiftboss_index> will be set based on block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
+   * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+   * has executed the first WorkOrder.
+   *
+   * @param query_id The query id.
+   * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForLip(
+      const std::size_t query_id,
+      const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
       const BlockLocator &block_locator,
       const block_id block,
       const std::size_t next_shiftboss_index_to_schedule,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index b304f9f..c9780fa 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -19,9 +19,12 @@
 
 #include "query_execution/QueryManagerDistributed.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <cstdlib>
 #include <memory>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -34,6 +37,7 @@
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "utility/DAG.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
 
 #include "glog/logging.h"
 
@@ -46,6 +50,7 @@ using std::malloc;
 using std::move;
 using std::size_t;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 
 namespace quickstep {
@@ -77,6 +82,62 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
     shiftboss_indexes_for_hash_joins_.push_back(
         vector<size_t>(query_context_proto.join_hash_tables(i).num_partitions(), kInvalidShiftbossIndex));
   }
+
+  computeLipFilterEquivalenceClasses(query_context_proto);
+}
+
+void QueryManagerDistributed::computeLipFilterEquivalenceClasses(
+    const serialization::QueryContext &query_context_proto) {
+  static constexpr LipFilterGroupIndex kInvalidLipFilterGroupIndex = static_cast<LipFilterGroupIndex>(-1);
+  lip_filter_groups_indexes_.resize(query_context_proto.lip_filters_size(), kInvalidLipFilterGroupIndex);
+
+  std::unordered_map<LipFilterGroupIndex, std::unordered_set<QueryContext::lip_filter_id>> lip_filter_groups;
+  for (int i = 0; i < query_context_proto.lip_filter_deployments_size(); ++i) {
+    const serialization::LIPFilterDeployment &lip_filter_deployment = query_context_proto.lip_filter_deployments(i);
+
+    unordered_set<QueryContext::lip_filter_id> lip_filter_ids;
+    for (int j = 0; j < lip_filter_deployment.build_entries_size(); ++j) {
+      lip_filter_ids.insert(lip_filter_deployment.build_entries(j).lip_filter_id());
+    }
+
+    for (int j = 0; j < lip_filter_deployment.probe_entries_size(); ++j) {
+      lip_filter_ids.insert(lip_filter_deployment.probe_entries(j).lip_filter_id());
+    }
+
+    LipFilterGroupIndex min_lip_filter_groups_index = kInvalidLipFilterGroupIndex;
+    unordered_set<LipFilterGroupIndex> lip_filter_groups_index_candidates;
+
+    for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_ids) {
+      const LipFilterGroupIndex lip_filter_groups_index = lip_filter_groups_indexes_[lip_filter_index];
+      if (lip_filter_groups_index != kInvalidLipFilterGroupIndex) {
+        if (min_lip_filter_groups_index == kInvalidLipFilterGroupIndex) {
+          min_lip_filter_groups_index = lip_filter_groups_index;
+        } else if (min_lip_filter_groups_index != lip_filter_groups_index) {
+          lip_filter_groups_index_candidates.insert(std::max(lip_filter_groups_index, min_lip_filter_groups_index));
+          min_lip_filter_groups_index = std::min(lip_filter_groups_index, min_lip_filter_groups_index);
+        }
+      }
+    }
+
+    if (min_lip_filter_groups_index == kInvalidLipFilterGroupIndex) {
+      const LipFilterGroupIndex lip_filter_groups_index = lip_filter_groups.size();
+      for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_ids) {
+        lip_filter_groups_indexes_[lip_filter_index] = lip_filter_groups_index;
+      }
+
+      lip_filter_groups.emplace(lip_filter_groups_index, move(lip_filter_ids));
+    } else {
+      for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_ids) {
+        lip_filter_groups_indexes_[lip_filter_index] = min_lip_filter_groups_index;
+      }
+
+      lip_filter_groups[min_lip_filter_groups_index].insert(lip_filter_ids.begin(), lip_filter_ids.end());
+
+      for (const LipFilterGroupIndex lip_filter_groups_index : lip_filter_groups_index_candidates) {
+        lip_filter_groups.erase(lip_filter_groups_index);
+      }
+    }
+  }
 }
 
 serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage() {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index ab4479c..0b9b848 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -21,7 +21,9 @@
 #define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
 
 #include <cstddef>
+#include <cstdint>
 #include <memory>
+#include <unordered_map>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -98,36 +100,43 @@ class QueryManagerDistributed final : public QueryManagerBase {
 
   /**
    * @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
-   * the Shiftboss index is not found, set using the block locality if found,
-   * otherwise <next_shiftboss_index_to_schedule>.
+   * the Shiftboss index is not found, set using <lip_filter_indexes> locality
+   * or the block locality if found, otherwise
+   * <next_shiftboss_index_to_schedule>.
    *
    * @param aggr_state_index The Hash Table for the Aggregation.
+   * @param lip_filter_indexes The LIP filter indexes.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
    * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
    **/
   void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+                                       const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
                                        const BlockLocator &block_locator,
                                        const block_id block,
                                        const std::size_t next_shiftboss_index_to_schedule,
                                        std::size_t *shiftboss_index) {
     DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
-    if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex &&
-        !block_locator.getBlockLocalityInfo(block, &shiftboss_indexes_for_aggrs_[aggr_state_index])) {
-      shiftboss_indexes_for_aggrs_[aggr_state_index] = next_shiftboss_index_to_schedule;
+    if (shiftboss_indexes_for_aggrs_[aggr_state_index] != kInvalidShiftbossIndex) {
+      *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index];
+      return;
     }
 
-    *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index];
+    getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+                            shiftboss_index);
+
+    shiftboss_indexes_for_aggrs_[aggr_state_index] = *shiftboss_index;
   }
 
   /**
    * @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
-   * Shiftboss index is not found, set using the block locality if found,
-   * otherwise <next_shiftboss_index_to_schedule>.
+   * Shiftboss index is not found, set using <lip_filter_indexes> locality or
+   * the block locality if found, otherwise <next_shiftboss_index_to_schedule>.
    *
    * @param join_hash_table_index The Hash Table for the Join.
    * @param part_id The partition ID.
+   * @param lip_filter_indexes The LIP filter indexes.
    * @param block_locator The BlockLocator to use.
    * @param block The block id to feed BlockLocator for the locality info.
    * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -135,6 +144,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
    **/
   void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
                                     const partition_id part_id,
+                                    const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
                                     const BlockLocator &block_locator,
                                     const block_id block,
                                     const std::size_t next_shiftboss_index_to_schedule,
@@ -142,13 +152,47 @@ class QueryManagerDistributed final : public QueryManagerBase {
     DCHECK_LT(join_hash_table_index, shiftboss_indexes_for_hash_joins_.size());
     DCHECK_LT(part_id, shiftboss_indexes_for_hash_joins_[join_hash_table_index].size());
 
-    if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex &&
-        !block_locator.getBlockLocalityInfo(block,
-                                            &shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id])) {
-      shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = next_shiftboss_index_to_schedule;
+    if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] != kInvalidShiftbossIndex) {
+      *shiftboss_index = shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id];
+      return;
     }
 
-    *shiftboss_index = shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id];
+    getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+                            shiftboss_index);
+
+    shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = *shiftboss_index;
+  }
+
+  /**
+   * @brief Get the index of Shiftboss for a LIP related WorkOrder. If the
+   * Shiftboss index is not found, set using the block locality if found,
+   * otherwise <next_shiftboss_index_to_schedule>.
+   *
+   * @param lip_filter_indexes The LIP filter indexes.
+   * @param block_locator The BlockLocator to use.
+   * @param block The block id to feed BlockLocator for the locality info.
+   * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+   * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+   **/
+  void getShiftbossIndexForLip(const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+                               const BlockLocator &block_locator,
+                               const block_id block,
+                               const std::size_t next_shiftboss_index_to_schedule,
+                               std::size_t *shiftboss_index) {
+    if (!lip_filter_indexes.empty() &&
+        shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]]
+            != kInvalidShiftbossIndex) {
+      *shiftboss_index =
+          shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]];
+      return;
+    } else if (!block_locator.getBlockLocalityInfo(block, shiftboss_index)) {
+      *shiftboss_index = next_shiftboss_index_to_schedule;
+    }
+
+    if (!lip_filter_indexes.empty()) {
+      shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]] =
+          *shiftboss_index;
+    }
   }
 
  private:
@@ -166,6 +210,8 @@ class QueryManagerDistributed final : public QueryManagerBase {
            query_exec_state_->hasRebuildFinished(index, num_shiftbosses_);
   }
 
+  void computeLipFilterEquivalenceClasses(const serialization::QueryContext &query_context_proto);
+
   const tmb::client_id foreman_client_id_;
 
   // TODO(quickstep-team): deal with Shiftboss failure.
@@ -184,6 +230,15 @@ class QueryManagerDistributed final : public QueryManagerBase {
   // [QueryContext::join_hash_table_id][partition_id].
   std::vector<std::vector<std::size_t>> shiftboss_indexes_for_hash_joins_;
 
+  typedef std::int64_t LipFilterGroupIndex;
+
+  // From an LIP id (QueryContext::lip_filter_id) to its index of the group that
+  // is used in the same LIPFilterDeployment.
+  std::vector<LipFilterGroupIndex> lip_filter_groups_indexes_;
+
+  // From a LipFilterGroupIndex to its scheduled Shiftboss index.
+  std::unordered_map<LipFilterGroupIndex, std::size_t> shiftboss_indexes_for_lip_filter_groups_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_optimizer/LIPFilterGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp
index 2ce2ea8..3562974 100644
--- a/query_optimizer/LIPFilterGenerator.cpp
+++ b/query_optimizer/LIPFilterGenerator.cpp
@@ -21,6 +21,7 @@
 
 #include <map>
 #include <memory>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -34,6 +35,8 @@
 
 #include "glog/logging.h"
 
+using std::unordered_set;
+
 namespace quickstep {
 namespace optimizer {
 
@@ -100,7 +103,19 @@ void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
   for (const auto &entry : lip_filter_deployment_protos_) {
     RelationalOperator *relop =
         execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(entry.first);
-    relop->deployLIPFilters(entry.second.first);
+
+    const auto &lip_filter_deployment_proto_pair = entry.second;
+    unordered_set<QueryContext::lip_filter_id> lip_filter_indexes;
+    const serialization::LIPFilterDeployment &lip_filter_deployment_proto =
+        *(lip_filter_deployment_proto_pair.second);
+    for (int i = 0; i < lip_filter_deployment_proto.build_entries_size(); ++i) {
+      lip_filter_indexes.insert(lip_filter_deployment_proto.build_entries(i).lip_filter_id());
+    }
+    for (int i = 0; i < lip_filter_deployment_proto.probe_entries_size(); ++i) {
+      lip_filter_indexes.insert(lip_filter_deployment_proto.probe_entries(i).lip_filter_id());
+    }
+
+    relop->deployLIPFilters(lip_filter_deployment_proto_pair.first, lip_filter_indexes);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index b18b5ec..af6acb8 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -46,11 +46,6 @@ QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
 int main(int argc, char** argv) {
   google::InitGoogleLogging(argv[0]);
 
-  // TODO(quickstep-team): Fix JIRA QUICKSTEP-76 for adding LIP filter support
-  // in the distributed version.
-  quickstep::optimizer::FLAGS_use_lip_filters = false;
-  quickstep::optimizer::FLAGS_use_filter_joins = false;
-
   // Honor FLAGS_buffer_pool_slots in StorageManager.
   gflags::ParseCommandLineFlags(&argc, &argv, true);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index e111f5b..c774719 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -98,6 +98,10 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_
   proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
   proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
 
+  for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+    proto->AddExtension(serialization::AggregationWorkOrder::lip_filter_indexes, lip_filter_index);
+  }
+
   return proto;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 8f40fbb..768c141 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -147,6 +147,10 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
   proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
   proto->SetExtension(serialization::BuildHashWorkOrder::lip_deployment_index, lip_deployment_index_);
 
+  for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+    proto->AddExtension(serialization::BuildHashWorkOrder::lip_filter_indexes, lip_filter_index);
+  }
+
   return proto;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
index f7c09cd..e7e549c 100644
--- a/relational_operators/BuildLIPFilterOperator.cpp
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -120,6 +120,10 @@ serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const blo
                       build_side_predicate_index_);
   proto->SetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index, lip_deployment_index_);
 
+  for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+    proto->AddExtension(serialization::BuildLIPFilterWorkOrder::lip_filter_indexes, lip_filter_index);
+  }
+
   return proto;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 77dc879..70bb185 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -388,6 +388,10 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
   proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_);
   proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
 
+  for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+    proto->AddExtension(serialization::HashJoinWorkOrder::lip_filter_indexes, lip_filter_index);
+  }
+
   return proto;
 }
 
@@ -446,6 +450,10 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
   proto->SetExtension(serialization::HashJoinWorkOrder::partition_id, part_id);
   proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
 
+  for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+    proto->AddExtension(serialization::HashJoinWorkOrder::lip_filter_indexes, lip_filter_index);
+  }
+
   for (const bool is_attribute_on_build : is_selection_on_build_) {
     proto->AddExtension(serialization::HashJoinWorkOrder::is_selection_on_build, is_attribute_on_build);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index c568654..425fa32 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -22,6 +22,7 @@
 
 #include <cstddef>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
@@ -274,8 +275,10 @@ class RelationalOperator {
   /**
    * @brief Deploy a group of LIPFilters to this operator.
    */
-  void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index) {
+  void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index,
+                        const std::unordered_set<QueryContext::lip_filter_id> &lip_filter_indexes) {
     lip_deployment_index_ = lip_deployment_index;
+    lip_filter_indexes_ = lip_filter_indexes;
   }
 
  protected:
@@ -300,6 +303,7 @@ class RelationalOperator {
   std::size_t op_index_;
 
   QueryContext::lip_deployment_id lip_deployment_index_;
+  std::unordered_set<QueryContext::lip_filter_id> lip_filter_indexes_;
 
  private:
   DISALLOW_COPY_AND_ASSIGN(RelationalOperator);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index b63f0be..935b104 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -150,6 +150,10 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
   proto->SetExtension(serialization::SelectWorkOrder::selection_index, selection_index_);
   proto->SetExtension(serialization::SelectWorkOrder::lip_deployment_index, lip_deployment_index_);
 
+  for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+    proto->AddExtension(serialization::SelectWorkOrder::lip_filter_indexes, lip_filter_index);
+  }
+
   return proto;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 7231c84..bac2eb0 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -64,6 +64,7 @@ message AggregationWorkOrder {
     optional uint32 aggr_state_index = 16;
     optional fixed64 block_id = 17;
     optional int32 lip_deployment_index = 18;
+    repeated uint32 lip_filter_indexes = 19;
   }
 }
 
@@ -76,7 +77,7 @@ message BuildAggregationExistenceMapWorkOrder {
   }
 }
 
-// Next tag: 39.
+// Next tag: 40.
 message BuildHashWorkOrder {
   extend WorkOrder {
     // All required.
@@ -87,6 +88,7 @@ message BuildHashWorkOrder {
     optional uint64 partition_id = 38;
     optional fixed64 block_id = 36;
     optional int32 lip_deployment_index = 37;
+    repeated uint32 lip_filter_indexes = 39;
   }
 }
 
@@ -97,6 +99,7 @@ message BuildLIPFilterWorkOrder {
     optional fixed64 build_block_id = 49;
     optional int32 build_side_predicate_index = 50;
     optional int32 lip_deployment_index = 51;
+    repeated uint32 lip_filter_indexes = 52;
   }
 }
 
@@ -141,7 +144,7 @@ message FinalizeAggregationWorkOrder {
   }
 }
 
-// Next tag: 173.
+// Next tag: 174.
 message HashJoinWorkOrder {
   enum HashJoinWorkOrderType {
     HASH_ANTI_JOIN = 0;
@@ -169,6 +172,7 @@ message HashJoinWorkOrder {
     repeated bool is_selection_on_build = 170;
 
     optional int32 lip_deployment_index = 171;
+    repeated uint32 lip_filter_indexes = 173;
   }
 }
 
@@ -227,6 +231,7 @@ message SelectWorkOrder {
     optional int32 selection_index = 246;
 
     optional int32 lip_deployment_index = 247;
+    repeated uint32 lip_filter_indexes = 248;
   }
 }