You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/15 23:35:36 UTC
incubator-quickstep git commit: Added the execution support for LIP
with partitions.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 5fbfd2111 -> 1c749c911
Added the execution support for LIP with partitions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/1c749c91
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/1c749c91
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/1c749c91
Branch: refs/heads/master
Commit: 1c749c911814926c48b8c4f7f34519605dc435a2
Parents: 5fbfd21
Author: Zuyu Zhang <zu...@apache.org>
Authored: Thu Jun 15 17:38:53 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 15 17:38:53 2017 -0500
----------------------------------------------------------------------
query_execution/ForemanDistributed.cpp | 5 ++++-
query_execution/PolicyEnforcerDistributed.cpp | 2 ++
query_execution/PolicyEnforcerDistributed.hpp | 2 ++
query_execution/QueryContext.proto | 2 ++
query_execution/QueryManagerDistributed.cpp | 6 ++++++
query_execution/QueryManagerDistributed.hpp | 24 ++++++++++++++++------
6 files changed, 34 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/ForemanDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/ForemanDistributed.cpp b/query_execution/ForemanDistributed.cpp
index 49f2101..942f383 100644
--- a/query_execution/ForemanDistributed.cpp
+++ b/query_execution/ForemanDistributed.cpp
@@ -329,6 +329,7 @@ bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
size_t *shiftboss_index_for_lip) {
const S::WorkOrder &work_order_proto = proto.work_order();
vector<QueryContext::lip_filter_id> lip_filter_indexes;
+ partition_id part_id;
block_id block = kInvalidBlockId;
switch (work_order_proto.work_order_type()) {
@@ -336,12 +337,14 @@ bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
for (int i = 0; i < work_order_proto.ExtensionSize(S::BuildLIPFilterWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::lip_filter_indexes, i));
}
+ part_id = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::partition_id);
block = work_order_proto.GetExtension(S::BuildLIPFilterWorkOrder::build_block_id);
break;
case S::SELECT:
for (int i = 0; i < work_order_proto.ExtensionSize(S::SelectWorkOrder::lip_filter_indexes); ++i) {
lip_filter_indexes.push_back(work_order_proto.GetExtension(S::SelectWorkOrder::lip_filter_indexes, i));
}
+ part_id = work_order_proto.GetExtension(S::SelectWorkOrder::partition_id);
block = work_order_proto.GetExtension(S::SelectWorkOrder::block_id);
break;
default:
@@ -349,7 +352,7 @@ bool ForemanDistributed::isLipRelatedWorkOrder(const S::WorkOrderMessage &proto,
}
static_cast<PolicyEnforcerDistributed*>(policy_enforcer_.get())->getShiftbossIndexForLip(
- proto.query_id(), lip_filter_indexes, block_locator_, block, next_shiftboss_index_to_schedule,
+ proto.query_id(), lip_filter_indexes, part_id, block_locator_, block, next_shiftboss_index_to_schedule,
shiftboss_index_for_lip);
return true;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/PolicyEnforcerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.cpp b/query_execution/PolicyEnforcerDistributed.cpp
index 766c351..2bd25fa 100644
--- a/query_execution/PolicyEnforcerDistributed.cpp
+++ b/query_execution/PolicyEnforcerDistributed.cpp
@@ -232,6 +232,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForHashJoin(
void PolicyEnforcerDistributed::getShiftbossIndexForLip(
const std::size_t query_id,
const vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+ const partition_id part_id,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
@@ -239,6 +240,7 @@ void PolicyEnforcerDistributed::getShiftbossIndexForLip(
DCHECK(admitted_queries_.find(query_id) != admitted_queries_.end());
QueryManagerDistributed *query_manager = static_cast<QueryManagerDistributed*>(admitted_queries_[query_id].get());
query_manager->getShiftbossIndexForLip(lip_filter_indexes,
+ part_id,
block_locator,
block,
next_shiftboss_index_to_schedule,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/PolicyEnforcerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerDistributed.hpp b/query_execution/PolicyEnforcerDistributed.hpp
index 23b0017..25a8ce0 100644
--- a/query_execution/PolicyEnforcerDistributed.hpp
+++ b/query_execution/PolicyEnforcerDistributed.hpp
@@ -187,6 +187,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
*
* @param query_id The query id.
* @param lip_filter_indexes The LIP filter indexes used by the WorkOrder.
+ * @param part_id The partition ID.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
@@ -195,6 +196,7 @@ class PolicyEnforcerDistributed final : public PolicyEnforcerBase {
void getShiftbossIndexForLip(
const std::size_t query_id,
const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+ const partition_id part_id,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/QueryContext.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index b76374c..6ad51b5 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -74,4 +74,6 @@ message QueryContext {
required uint64 query_id = 13;
repeated uint64 num_partitions_for_nested_loops_joins = 14;
+
+ optional uint64 num_partitions_for_lips = 15 [default = 1];
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/QueryManagerDistributed.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 77a605e..a248391 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -92,6 +92,12 @@ QueryManagerDistributed::QueryManagerDistributed(QueryHandle *query_handle,
}
computeLipFilterEquivalenceClasses(query_context_proto);
+
+ const size_t num_partitions_for_lips = query_context_proto.num_partitions_for_lips();
+ for (const LipFilterGroupIndex lip_filter_groups_index : lip_filter_groups_indexes_) {
+ shiftboss_indexes_for_lip_filter_groups_.emplace(lip_filter_groups_index,
+ vector<size_t>(num_partitions_for_lips, kInvalidShiftbossIndex));
+ }
}
void QueryManagerDistributed::computeLipFilterEquivalenceClasses(
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/1c749c91/query_execution/QueryManagerDistributed.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerDistributed.hpp b/query_execution/QueryManagerDistributed.hpp
index 6490eb7..a021fdd 100644
--- a/query_execution/QueryManagerDistributed.hpp
+++ b/query_execution/QueryManagerDistributed.hpp
@@ -127,7 +127,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
return;
}
- getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+ getShiftbossIndexForLip(lip_filter_indexes, part_id, block_locator, block, next_shiftboss_index_to_schedule,
shiftboss_index);
shiftboss_indexes_for_aggrs_[aggr_state_index][part_id] = *shiftboss_index;
@@ -161,7 +161,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
return;
}
- getShiftbossIndexForLip(lip_filter_indexes, block_locator, block, next_shiftboss_index_to_schedule,
+ getShiftbossIndexForLip(lip_filter_indexes, part_id, block_locator, block, next_shiftboss_index_to_schedule,
shiftboss_index);
shiftboss_indexes_for_hash_joins_[join_hash_table_index][part_id] = *shiftboss_index;
@@ -173,28 +173,40 @@ class QueryManagerDistributed final : public QueryManagerBase {
* otherwise <next_shiftboss_index_to_schedule>.
*
* @param lip_filter_indexes The LIP filter indexes.
+ * @param part_id The partition ID.
* @param block_locator The BlockLocator to use.
* @param block The block id to feed BlockLocator for the locality info.
* @param next_shiftboss_index The index of Shiftboss to schedule a next WorkOrder.
* @param shiftboss_index The index of Shiftboss to schedule the WorkOrder.
**/
void getShiftbossIndexForLip(const std::vector<QueryContext::lip_filter_id> &lip_filter_indexes,
+ const partition_id part_id,
const BlockLocator &block_locator,
const block_id block,
const std::size_t next_shiftboss_index_to_schedule,
std::size_t *shiftboss_index) {
+#ifdef QUICKSTEP_DEBUG
+ if (!lip_filter_indexes.empty()) {
+ DCHECK_LT(lip_filter_indexes.front(), lip_filter_groups_indexes_.size());
+ const auto cit =
+ shiftboss_indexes_for_lip_filter_groups_.find(lip_filter_groups_indexes_[lip_filter_indexes.front()]);
+ DCHECK(cit != shiftboss_indexes_for_lip_filter_groups_.end());
+ DCHECK_LT(part_id, cit->second.size());
+ }
+#endif
+
if (!lip_filter_indexes.empty() &&
- shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]]
+ shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]][part_id]
!= kInvalidShiftbossIndex) {
*shiftboss_index =
- shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]];
+ shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]][part_id];
return;
} else if (!block_locator.getBlockLocalityInfo(block, shiftboss_index)) {
*shiftboss_index = next_shiftboss_index_to_schedule;
}
if (!lip_filter_indexes.empty()) {
- shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]] =
+ shiftboss_indexes_for_lip_filter_groups_[lip_filter_groups_indexes_[lip_filter_indexes.front()]][part_id] =
*shiftboss_index;
}
}
@@ -282,7 +294,7 @@ class QueryManagerDistributed final : public QueryManagerBase {
std::vector<LipFilterGroupIndex> lip_filter_groups_indexes_;
// From a LipFilterGroupIndex to its scheduled Shiftboss index.
- std::unordered_map<LipFilterGroupIndex, std::size_t> shiftboss_indexes_for_lip_filter_groups_;
+ std::unordered_map<LipFilterGroupIndex, std::vector<std::size_t>> shiftboss_indexes_for_lip_filter_groups_;
DISALLOW_COPY_AND_ASSIGN(QueryManagerDistributed);
};