You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/06/13 21:13:46 UTC
incubator-quickstep git commit: QUICKSTEP-76: Enabled LIP in the
distributed version.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 2a622460b -> d0c55320f
QUICKSTEP-76: Enabled LIP in the distributed version.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/d0c55320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/d0c55320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/d0c55320
Branch: refs/heads/master
Commit: d0c55320f476141b2747dc4048735ca8acebda34
Parents: 2a62246
Author: Zuyu Zhang <zu...@apache.org>
Authored: Fri May 5 17:53:09 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Jun 13 15:23:40 2017 -0500
----------------------------------------------------------------------
cli/distributed/QuickstepDistributedCli.cpp | 5 --
query_execution/CMakeLists.txt | 1 +
query_execution/ForemanDistributed.cpp | 56 ++++++++++++--
query_execution/ForemanDistributed.hpp | 4 +
query_execution/PolicyEnforcerDistributed.cpp | 20 +++++
query_execution/PolicyEnforcerDistributed.hpp | 35 ++++++++-
query_execution/QueryManagerDistributed.cpp | 61 +++++++++++++++
query_execution/QueryManagerDistributed.hpp | 81 ++++++++++++++++----
query_optimizer/LIPFilterGenerator.cpp | 17 +++-
.../tests/DistributedExecutionGeneratorTest.cpp | 5 --
relational_operators/AggregationOperator.cpp | 4 +
relational_operators/BuildHashOperator.cpp | 4 +
relational_operators/BuildLIPFilterOperator.cpp | 4 +
relational_operators/HashJoinOperator.cpp | 8 ++
relational_operators/RelationalOperator.hpp | 6 +-
relational_operators/SelectOperator.cpp | 4 +
relational_operators/WorkOrder.proto | 9 ++-
17 files changed, 285 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/cli/distributed/QuickstepDistributedCli.cpp
----------------------------------------------------------------------
diff --git a/cli/distributed/QuickstepDistributedCli.cpp b/cli/distributed/QuickstepDistributedCli.cpp
index 08443cd..513bedd 100644
--- a/cli/distributed/QuickstepDistributedCli.cpp
+++ b/cli/distributed/QuickstepDistributedCli.cpp
@@ -69,11 +69,6 @@ using quickstep::FLAGS_role;
int main(int argc, char *argv[]) {
google::InitGoogleLogging(argv[0]);
- // TODO(quickstep-team): Fix JIRA QUICKSTEP-76 for adding LIP filter support
- // in the distributed version.
- quickstep::optimizer::FLAGS_use_lip_filters = false;
- quickstep::optimizer::FLAGS_use_filter_joins = false;
-
gflags::ParseCommandLineFlags(&argc, &argv, true);
grpc_init();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index c74fa36..4c3b52a 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -281,6 +281,7 @@ if (ENABLE_DISTRIBUTED)
quickstep_storage_StorageBlockInfo
quickstep_utility_DAG
quickstep_utility_Macros
+ quickstep_utility_lipfilter_LIPFilter_proto
tmb)
endif(ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution_QueryManagerSingleNode
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index e5e0eee..fbac18e 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -243,11 +243,17 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
size_t *shiftboss_index_for_aggregation) {
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::aggregation_state_id aggr_state_index;
+ vector<QueryContext::lip_filter_id> lip_filter_indexes;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::AGGREGATION:
aggr_state_index = work_order_proto.GetExtension(S::AggregationWorkOrder::aggr_state_index);
+
+ for (int i = 0; i < work_order_proto.ExtensionSize(S::AggregationWorkOrder::lip_filter_indexes); ++i) {
+ lip_filter_indexes.push_back(work_order_proto.GetExtension(S::AggregationWorkOrder::lip_filter_indexes, i));
+ }
+
block = work_order_proto.GetExtension(S::AggregationWorkOrder::block_id);
break;
case S::FINALIZE_AGGREGATION:
@@ -261,8 +267,8 @@ bool ForemanDistributed::isAggregationRelatedWorkOrder(const S::WorkOrderMessage
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForAggregation(
- proto.query_id(), aggr_state_index, block_locator_, block, next_shiftboss_index_to_schedule,
- shiftboss_index_for_aggregation);
+ proto.query_id(), aggr_state_index, lip_filter_indexes, block_locator_, block,
+ next_shiftboss_index_to_schedule, shiftboss_index_for_aggregation);
return true;
}
@@ -273,12 +279,18 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
const S::WorkOrder &work_order_proto = proto.work_order();
QueryContext::join_hash_table_id join_hash_table_index;
partition_id part_id;
+ vector<QueryContext::lip_filter_id> lip_filter_indexes;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
case S::BUILD_HASH:
join_hash_table_index = work_order_proto.GetExtension(S::BuildHashWorkOrder::join_hash_table_index);
part_id = work_order_proto.GetExtension(S::BuildHashWorkOrder::partition_id);
+
+ for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildHashWorkOrder::lip_filter_indexes); ++i) {
+ lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildHashWorkOrder::lip_filter_indexes, i));
+ }
+
block = work_order_proto.GetExtension(S::BuildHashWorkOrder::block_id);
break;
case S::HASH_JOIN:
@@ -294,8 +306,39 @@ bool ForemanDistributed::isHashJoinRelatedWorkOrder(const S::WorkOrderMessage &p
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForHashJoin(
- proto.query_id(), join_hash_table_index, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
- shiftboss_index_for_hash_join);
+ proto.query_id(), join_hash_table_index, part_id, lip_filter_indexes, block_locator_, block,
+ next_shiftboss_index_to_schedule, shiftboss_index_for_hash_join);
+
+ return true;
+}
+
+bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
+ const size_t next_shiftboss_index_to_schedule,
+ size_t *shiftboss_index_for_lip) {
+ const S::WorkOrder &work_order_proto = proto.work_order();
+ vector<QueryContext::lip_filter_id> lip_filter_indexes;
+ block_id block = kInvalidBlockId;
+
+ switch (work_order_proto.work_order_type()) {
+ case S::BUILD_LIP_FILTER:
+ for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildLIPFilterWorkOrder::lip_filter_indexes); ++i) {
+ lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::lip_filter_indexes, i));
+ }
+ block = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::build_block_id);
+ break;
+ case S::SELECT:
+ for (int i = 0; i < work_order_proto.ExtensionSize(S::SelectWorkOrder::lip_filter_indexes); ++i) {
+ lip_filter_indexes.push_back(work_order_proto.GetExtension(S::SelectWorkOrder::lip_filter_indexes, i));
+ }
+ block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
+ break;
+ default:
+ return false;
+ }
+
+ static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForLip(
+ proto.query_id(), lip_filter_indexes, block_locator_, block, next_shiftboss_index_to_schedule,
+ shiftboss_index_for_lip);
return true;
}
@@ -329,10 +372,6 @@ bool hasBlockLocalityInfo(const serialization::WorkOrder &work_order_proto,
block = work_order_proto.GetExtension(S::SaveBlocksWorkOrder::block_id);
break;
}
- case S::SELECT: {
- block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
- break;
- }
case S::SORT_RUN_GENERATION: {
block = work_order_proto.GetExtension(S::SortRunGenerationWorkOrder::block_id);
break;
@@ -359,6 +398,7 @@ void ForemanDistributed::dispatchWorkOrderMessages(const vector<unique_ptr<S::Wo
if (policy_enforcer_dist->isSingleNodeQuery(proto.query_id())) {
// Always schedule the single-node query to the same Shiftboss.
shiftboss_index_for_particular_work_order_type = kDefaultShiftbossIndex;
+ } else if (isLipRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isAggregationRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (isHashJoinRelatedWorkOrder(proto, shiftboss_index, &shiftboss_index_for_particular_work_order_type)) {
} else if (hasBlockLocalityInfo(work_order_proto, block_locator_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/ForemanDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.hpp b/query_execution/ForemanDistributed.hpp
index b975428..543d83f 100644
--- a/query_execution/ForemanDistributed.hpp
+++ b/query_execution/ForemanDistributed.hpp
@@ -98,6 +98,10 @@ class ForemanDistributed final : public ForemanBase {
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index_for_hash_join);
+ bool isLipRelatedWorkOrder(const serialization::WorkOrderMessage &proto,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index_for_lip);
+
/**
* @brief Dispatch schedulable WorkOrders, wrapped in WorkOrderMessages to the
* worker threads.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index b410152..0a4fd30 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -192,6 +192,7 @@ void PolicyEnforcerDistributed::processInitiateRebuildResponseMessage(const tmb:
void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
@@ -199,6 +200,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForAggregation(
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
query_manager->getShiftbossIndexForAggregation(aggr_state_index,
+ lip_filter_indexes,
block_locator,
block,
next_shiftboss_index_to_schedule,
@@ -209,6 +211,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
const std::size_t query_id,
const QueryContext::join_hash_table_id join_hash_table_index,
const partition_id part_id,
+ const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
@@ -217,12 +220,29 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
query_manager->getShiftbossIndexForHashJoin(join_hash_table_index,
part_id,
+ lip_filter_indexes,
block_locator,
block,
next_shiftboss_index_to_schedule,
shiftboss_index);
}
+void PolicyEnforcerDistributed::getShiftbossIndexForLip(
+ const std::size_t query_id,
+ const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+ const BlockLocator &block_locator,
+ const block_id block,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
+ QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
+ query_manager->getShiftbossIndexForLip(lip_filter_indexes,
+ block_locator,
+ block,
+ next_shiftboss_index_to_schedule,
+ shiftboss_index);
+}
+
void PolicyEnforcerDistributed::initiateQueryInShiftboss(QueryHandle *query_handle) {
S::QueryInitiateMessage proto;
proto.set_query_id(query_handle->query_id());
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index cd3a434..f44fd2e 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -126,13 +126,14 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
/**
* @brief Get or set the index of Shiftboss for an Aggregation related
* WorkOrder. If it is the first Aggregation on <aggr_state_index>,
- * <shiftboss_index> will be set based on block locality if found,
- * otherwise <next_shiftboss_index_to_schedule>.
+ * <shiftboss_index> will be set based on <lip_filter_indexes> locality or
+ * block locality if found, otherwise <next_shiftboss_index_to_schedule>.
* Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
* has executed the first Aggregation.
*
* @param query_id The query id.
* @param aggr_state_index The Hash Table for the Aggregation.
+ * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -141,6 +142,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void getShiftbossIndexForAggregation(
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
+ const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
@@ -149,14 +151,15 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
/**
* @brief Get or set the index of Shiftboss for a HashJoin related WorkOrder.
* If it is the first BuildHash on <join_hash_table_index, part_id>,
- * <shiftboss_index> will be set to block locality if found,
- * otherwise <next_shiftboss_index_to_schedule>.
+ * <shiftboss_index> will be set based on <lip_filter_indexes> locality or
+ * block locality if found, otherwise <next_shiftboss_index_to_schedule>.
* Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
* has executed the first BuildHash.
*
* @param query_id The query id.
* @param join_hash_table_index The Hash Table for the Join.
* @param part_id The partition ID.
+ * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -166,6 +169,30 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
const std::size_t query_id,
const QueryContext::join_hash_table_id join_hash_table_index,
const partition_id part_id,
+ const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+ const BlockLocator &block_locator,
+ const block_id block,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index);
+
+ /**
+ * @brief Get or set the index of Shiftboss for a LIP related WorkOrder.
+ * If it is the first WorkOrder on <lip_filter_indexes>,
+ * <shiftboss_index> will be set based on block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
+ * Otherwise, <shiftboss_index> will be set to the index of the Shiftboss that
+ * has executed the first WorkOrder.
+ *
+ * @param query_id The query id.
+ * @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
+ * @param block_locator The BlockLocator to use.
+ * @param block The block id to feed BlockLocator for the locality info.
+ * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForLip(
+ const std::size_t query_id,
+ const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index b304f9f..c9780fa 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -19,9 +19,12 @@
#include "query_execution/QueryManagerDistributed.hpp"
+#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <memory>
+#include <unordered_map>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -34,6 +37,7 @@
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "utility/DAG.hpp"
+#include "utility/lip_filter/LIPFilter.pb.h"
#include "glog/logging.h"
@@ -46,6 +50,7 @@ using std::malloc;
using std::move;
using std::size_t;
using std::unique_ptr;
+using std::unordered_set;
using std::vector;
namespace quickstep {
@@ -77,6 +82,62 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
shiftboss_indexes_for_hash_joins_.push_back(
vector<size_t>(query_context_proto.join_hash_tables(i).num_partitions(), kInvalidShiftbossIndex));
}
+
+ computeLipFilterEquivalenceClasses(query_context_proto);
+}
+
+void QueryManagerDistributed::computeLipFilterEquivalenceClasses(
+ const serialization::QueryContext &query_context_proto) {
+ static constexpr LipFilterGroupIndex kInvalidLipFilterGroupIndex = static_cast<LipFilterGroupIndex>(-1);
+ lip_filter_groups_indexes_.resize(query_context_proto.lip_filters_size(), kInvalidLipFilterGroupIndex);
+
+ std::unordered_map<LipFilterGroupIndex, std::unordered_set<QueryContext::lip_filter_id>> lip_filter_groups;
+ for (int i = 0; i < query_context_proto.lip_filter_deployments_size(); ++i) {
+ const serialization::LIPFilterDeployment &lip_filter_deployment = query_context_proto.lip_filter_deployments(i);
+
+ unordered_set<QueryContext::lip_filter_id> lip_filter_ids;
+ for (int j = 0; j < lip_filter_deployment.build_entries_size(); ++j) {
+ lip_filter_ids.insert(lip_filter_deployment.build_entries(j).lip_filter_id());
+ }
+
+ for (int j = 0; j < lip_filter_deployment.probe_entries_size(); ++j) {
+ lip_filter_ids.insert(lip_filter_deployment.probe_entries(j).lip_filter_id());
+ }
+
+ LipFilterGroupIndex min_lip_filter_groups_index = kInvalidLipFilterGroupIndex;
+ unordered_set<LipFilterGroupIndex> lip_filter_groups_index_candidates;
+
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_ids) {
+ const LipFilterGroupIndex lip_filter_groups_index = lip_filter_groups_indexes_[lip_filter_index];
+ if (lip_filter_groups_index != kInvalidLipFilterGroupIndex) {
+ if (min_lip_filter_groups_index == kInvalidLipFilterGroupIndex) {
+ min_lip_filter_groups_index = lip_filter_groups_index;
+ } else if (min_lip_filter_groups_index != lip_filter_groups_index) {
+ lip_filter_groups_index_candidates.insert(std::max(lip_filter_groups_index, min_lip_filter_groups_index));
+ min_lip_filter_groups_index = std::min(lip_filter_groups_index, min_lip_filter_groups_index);
+ }
+ }
+ }
+
+ if (min_lip_filter_groups_index == kInvalidLipFilterGroupIndex) {
+ const LipFilterGroupIndex lip_filter_groups_index = lip_filter_groups.size();
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_ids) {
+ lip_filter_groups_indexes_[lip_filter_index] = lip_filter_groups_index;
+ }
+
+ lip_filter_groups.emplace(lip_filter_groups_index, move(lip_filter_ids));
+ } else {
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_ids) {
+ lip_filter_groups_indexes_[lip_filter_index] = min_lip_filter_groups_index;
+ }
+
+ lip_filter_groups[min_lip_filter_groups_index].insert(lip_filter_ids.begin(), lip_filter_ids.end());
+
+ for (const LipFilterGroupIndex lip_filter_groups_index : lip_filter_groups_index_candidates) {
+ lip_filter_groups.erase(lip_filter_groups_index);
+ }
+ }
+ }
}
serialization::WorkOrderMessage* QueryManagerDistributed::getNextWorkOrderMessage() {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index ab4479c..0b9b848 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -21,7 +21,9 @@
#define QUICKSTEP_QUERY_EXECUTION_QUERY_MANAGER_DISTRIBUTED_HPP_
#include <cstddef>
+#include <cstdint>
#include <memory>
+#include <unordered_map>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -98,36 +100,43 @@ class QueryManagerDistributed final : public QueryManagerBase {
/**
* @brief Get the index of Shiftboss for an Aggregation related WorkOrder. If
- * the Shiftboss index is not found, set using the block locality if found,
- * otherwise <next_shiftboss_index_to_schedule>.
+ * the Shiftboss index is not found, set using <lip_filter_indexes> locality
+ * or the block locality if found, otherwise
+ * <next_shiftboss_index_to_schedule>.
*
* @param aggr_state_index The Hash Table for the Aggregation.
+ * @param lip_filter_indexes The LIP filter indexes.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
void getShiftbossIndexForAggregation(const QueryContext::aggregation_state_id aggr_state_index,
+ const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
DCHECK_LT(aggr_state_index, shiftboss_indexes_for_aggrs_.size());
- if (shiftboss_indexes_for_aggrs_[aggr_state_index] == kInvalidShiftbossIndex &&
- !block_locator.getBlockLocalityInfo(block, &shiftboss_indexes_for_aggrs_[aggr_state_index])) {
- shiftboss_indexes_for_aggrs_[aggr_state_index] = next_shiftboss_index_to_schedule;
+ if (shiftboss_indexes_for_aggrs_[aggr_state_index] != kInvalidShiftbossIndex) {
+ *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index];
+ return;
}
- *shiftboss_index = shiftboss_indexes_for_aggrs_[aggr_state_index];
+ getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+ shiftboss_index);
+
+ shiftboss_indexes_for_aggrs_[aggr_state_index] = *shiftboss_index;
}
/**
* @brief Get the index of Shiftboss for a HashJoin related WorkOrder. If the
- * Shiftboss index is not found, set using the block locality if found,
- * otherwise <next_shiftboss_index_to_schedule>.
+ * Shiftboss index is not found, set using <lip_filter_indexes> locality or
+ * the block locality if found, otherwise <next_shiftboss_index_to_schedule>.
*
* @param join_hash_table_index The Hash Table for the Join.
* @param part_id The partition ID.
+ * @param lip_filter_indexes The LIP filter indexes.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -135,6 +144,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
**/
void getShiftbossIndexForHashJoin(const QueryContext::join_hash_table_id join_hash_table_index,
const partition_id part_id,
+ const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
@@ -142,13 +152,47 @@ class QueryManagerDistributed final : public QueryManagerBase {
DCHECK_LT(join_hash_table_index, shiftboss_indexes_for_hash_joins_.size());
DCHECK_LT(part_id, shiftboss_indexes_for_hash_joins_[join_hash_table_index].size());
- if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] == kInvalidShiftbossIndex &&
- !block_locator.getBlockLocalityInfo(block,
- &shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id])) {
- shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = next_shiftboss_index_to_schedule;
+ if (shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] != kInvalidShiftbossIndex) {
+ *shiftboss_index = shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id];
+ return;
}
- *shiftboss_index = shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id];
+ getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+ shiftboss_index);
+
+ shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = *shiftboss_index;
+ }
+
+ /**
+ * @brief Get the index of Shiftboss for a LIP related WorkOrder. If the
+ * Shiftboss index is not found, set using the block locality if found,
+ * otherwise <next_shiftboss_index_to_schedule>.
+ *
+ * @param lip_filter_indexes The LIP filter indexes.
+ * @param block_locator The BlockLocator to use.
+ * @param block The block id to feed BlockLocator for the locality info.
+ * @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
+ * @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
+ **/
+ void getShiftbossIndexForLip(const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+ const BlockLocator &block_locator,
+ const block_id block,
+ const std::size_t next_shiftboss_index_to_schedule,
+ std::size_t *shiftboss_index) {
+ if (!lip_filter_indexes.empty() &&
+ shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]]
+ != kInvalidShiftbossIndex) {
+ *shiftboss_index =
+ shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]];
+ return;
+ } else if (!block_locator.getBlockLocalityInfo(block, shiftboss_index)) {
+ *shiftboss_index = next_shiftboss_index_to_schedule;
+ }
+
+ if (!lip_filter_indexes.empty()) {
+ shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]] =
+ *shiftboss_index;
+ }
}
private:
@@ -166,6 +210,8 @@ class QueryManagerDistributed final : public QueryManagerBase {
query_exec_state_->hasRebuildFinished(index, num_shiftbosses_);
}
+ void computeLipFilterEquivalenceClasses(const serialization::QueryContext &query_context_proto);
+
const tmb::client_id foreman_client_id_;
// TODO(quickstep-team): deal with Shiftboss failure.
@@ -184,6 +230,15 @@ class QueryManagerDistributed final : public QueryManagerBase {
// [QueryContext::join_hash_table_id][partition_id].
std::vector<std::vector<std::size_t>> shiftboss_indexes_for_hash_joins_;
+ typedef std::int64_t LipFilterGroupIndex;
+
+ // From an LIP id (QueryContext::lip_filter_id) to its index of the group that
+ // is used in the same LIPFilterDeployment.
+ std::vector<LipFilterGroupIndex> lip_filter_groups_indexes_;
+
+ // From a LipFilterGroupIndex to its scheduled Shiftboss index.
+ std::unordered_map<LipFilterGroupIndex, std::size_t> shiftboss_indexes_for_lip_filter_groups_;
+
DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_optimizer/LIPFilterGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/LIPFilterGenerator.cpp b/query_optimizer/LIPFilterGenerator.cpp
index 2ce2ea8..3562974 100644
--- a/query_optimizer/LIPFilterGenerator.cpp
+++ b/query_optimizer/LIPFilterGenerator.cpp
@@ -21,6 +21,7 @@
#include <map>
#include <memory>
+#include <unordered_set>
#include <utility>
#include <vector>
@@ -34,6 +35,8 @@
#include "glog/logging.h"
+using std::unordered_set;
+
namespace quickstep {
namespace optimizer {
@@ -100,7 +103,19 @@ void LIPFilterGenerator::deployLIPFilters(QueryPlan *execution_plan,
for (const auto &entry : lip_filter_deployment_protos_) {
RelationalOperator *relop =
execution_plan->getQueryPlanDAGMutable()->getNodePayloadMutable(entry.first);
- relop->deployLIPFilters(entry.second.first);
+
+ const auto &lip_filter_deployment_proto_pair = entry.second;
+ unordered_set<QueryContext::lip_filter_id> lip_filter_indexes;
+ const serialization::LIPFilterDeployment &lip_filter_deployment_proto =
+ *(lip_filter_deployment_proto_pair.second);
+ for (int i = 0; i < lip_filter_deployment_proto.build_entries_size(); ++i) {
+ lip_filter_indexes.insert(lip_filter_deployment_proto.build_entries(i).lip_filter_id());
+ }
+ for (int i = 0; i < lip_filter_deployment_proto.probe_entries_size(); ++i) {
+ lip_filter_indexes.insert(lip_filter_deployment_proto.probe_entries(i).lip_filter_id());
+ }
+
+ relop->deployLIPFilters(lip_filter_deployment_proto_pair.first, lip_filter_indexes);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
index b18b5ec..af6acb8 100644
--- a/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
+++ b/query_optimizer/tests/DistributedExecutionGeneratorTest.cpp
@@ -46,11 +46,6 @@ QUICKSTEP_GENERATE_TEXT_TEST(DISTRIBUTED_EXECUTION_GENERATOR_TEST);
int main(int argc, char** argv) {
google::InitGoogleLogging(argv[0]);
- // TODO(quickstep-team): Fix JIRA QUICKSTEP-76 for adding LIP filter support
- // in the distributed version.
- quickstep::optimizer::FLAGS_use_lip_filters = false;
- quickstep::optimizer::FLAGS_use_filter_joins = false;
-
// Honor FLAGS_buffer_pool_slots in StorageManager.
gflags::ParseCommandLineFlags(&argc, &argv, true);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/AggregationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.cpp b/relational_operators/AggregationOperator.cpp
index e111f5b..c774719 100644
--- a/relational_operators/AggregationOperator.cpp
+++ b/relational_operators/AggregationOperator.cpp
@@ -98,6 +98,10 @@ serialization::WorkOrder* AggregationOperator::createWorkOrderProto(const block_
proto->SetExtension(serialization::AggregationWorkOrder::aggr_state_index, aggr_state_index_);
proto->SetExtension(serialization::AggregationWorkOrder::lip_deployment_index, lip_deployment_index_);
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+ proto->AddExtension(serialization::AggregationWorkOrder::lip_filter_indexes, lip_filter_index);
+ }
+
return proto;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/BuildHashOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.cpp b/relational_operators/BuildHashOperator.cpp
index 8f40fbb..768c141 100644
--- a/relational_operators/BuildHashOperator.cpp
+++ b/relational_operators/BuildHashOperator.cpp
@@ -147,6 +147,10 @@ serialization::WorkOrder* BuildHashOperator::createWorkOrderProto(const block_id
proto->SetExtension(serialization::BuildHashWorkOrder::block_id, block);
proto->SetExtension(serialization::BuildHashWorkOrder::lip_deployment_index, lip_deployment_index_);
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+ proto->AddExtension(serialization::BuildHashWorkOrder::lip_filter_indexes, lip_filter_index);
+ }
+
return proto;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/BuildLIPFilterOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildLIPFilterOperator.cpp b/relational_operators/BuildLIPFilterOperator.cpp
index f7c09cd..e7e549c 100644
--- a/relational_operators/BuildLIPFilterOperator.cpp
+++ b/relational_operators/BuildLIPFilterOperator.cpp
@@ -120,6 +120,10 @@ serialization::WorkOrder* BuildLIPFilterOperator::createWorkOrderProto(const blo
build_side_predicate_index_);
proto->SetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index, lip_deployment_index_);
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+ proto->AddExtension(serialization::BuildLIPFilterWorkOrder::lip_filter_indexes, lip_filter_index);
+ }
+
return proto;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/HashJoinOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 77dc879..70bb185 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -388,6 +388,10 @@ serialization::WorkOrder* HashJoinOperator::createNonOuterJoinWorkOrderProto(
proto->SetExtension(serialization::HashJoinWorkOrder::residual_predicate_index, residual_predicate_index_);
proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+ proto->AddExtension(serialization::HashJoinWorkOrder::lip_filter_indexes, lip_filter_index);
+ }
+
return proto;
}
@@ -446,6 +450,10 @@ serialization::WorkOrder* HashJoinOperator::createOuterJoinWorkOrderProto(const
proto->SetExtension(serialization::HashJoinWorkOrder::partition_id, part_id);
proto->SetExtension(serialization::HashJoinWorkOrder::lip_deployment_index, lip_deployment_index_);
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+ proto->AddExtension(serialization::HashJoinWorkOrder::lip_filter_indexes, lip_filter_index);
+ }
+
for (const bool is_attribute_on_build : is_selection_on_build_) {
proto->AddExtension(serialization::HashJoinWorkOrder::is_selection_on_build, is_attribute_on_build);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index c568654..425fa32 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -22,6 +22,7 @@
#include <cstddef>
#include <string>
+#include <unordered_set>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -274,8 +275,10 @@ class RelationalOperator {
/**
* @brief Deploy a group of LIPFilters to this operator.
*/
- void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index) {
+ void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index,
+ const std::unordered_set<QueryContext::lip_filter_id> &lip_filter_indexes) {
lip_deployment_index_ = lip_deployment_index;
+ lip_filter_indexes_ = lip_filter_indexes;
}
protected:
@@ -300,6 +303,7 @@ class RelationalOperator {
std::size_t op_index_;
QueryContext::lip_deployment_id lip_deployment_index_;
+ std::unordered_set<QueryContext::lip_filter_id> lip_filter_indexes_;
private:
DISALLOW_COPY_AND_ASSIGN(RelationalOperator);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index b63f0be..935b104 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -150,6 +150,10 @@ serialization::WorkOrder* SelectOperator::createWorkOrderProto(const block_id bl
proto->SetExtension(serialization::SelectWorkOrder::selection_index, selection_index_);
proto->SetExtension(serialization::SelectWorkOrder::lip_deployment_index, lip_deployment_index_);
+ for (const QueryContext::lip_filter_id lip_filter_index : lip_filter_indexes_) {
+ proto->AddExtension(serialization::SelectWorkOrder::lip_filter_indexes, lip_filter_index);
+ }
+
return proto;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/d0c55320/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 7231c84..bac2eb0 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -64,6 +64,7 @@ message AggregationWorkOrder {
optional uint32 aggr_state_index = 16;
optional fixed64 block_id = 17;
optional int32 lip_deployment_index = 18;
+ repeated uint32 lip_filter_indexes = 19;
}
}
@@ -76,7 +77,7 @@ message BuildAggregationExistenceMapWorkOrder {
}
}
-// Next tag: 39.
+// Next tag: 40.
message BuildHashWorkOrder {
extend WorkOrder {
// All required.
@@ -87,6 +88,7 @@ message BuildHashWorkOrder {
optional uint64 partition_id = 38;
optional fixed64 block_id = 36;
optional int32 lip_deployment_index = 37;
+ repeated uint32 lip_filter_indexes = 39;
}
}
@@ -97,6 +99,7 @@ message BuildLIPFilterWorkOrder {
optional fixed64 build_block_id = 49;
optional int32 build_side_predicate_index = 50;
optional int32 lip_deployment_index = 51;
+ repeated uint32 lip_filter_indexes = 52;
}
}
@@ -141,7 +144,7 @@ message FinalizeAggregationWorkOrder {
}
}
-// Next tag: 173.
+// Next tag: 174.
message HashJoinWorkOrder {
enum HashJoinWorkOrderType {
HASH_ANTI_JOIN = 0;
@@ -169,6 +172,7 @@ message HashJoinWorkOrder {
repeated bool is_selection_on_build = 170;
optional int32 lip_deployment_index = 171;
+ repeated uint32 lip_filter_indexes = 173;
}
}
@@ -227,6 +231,7 @@ message SelectWorkOrder {
optional int32 selection_index = 246;
optional int32 lip_deployment_index = 247;
+ repeated uint32 lip_filter_indexes = 248;
}
}