You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/05 06:16:53 UTC
[17/30] incubator-quickstep git commit: Partition aware selection
using NUMA-awareness (#175)
Partition aware selection using NUMA-awareness (#175)
Link: https://github.com/pivotalsoftware/quickstep/pull/175
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/52b758fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/52b758fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/52b758fc
Branch: refs/heads/master
Commit: 52b758fc8f8e4c88cc17b81a398f469ede594069
Parents: 6f4dd8f
Author: Adalbert Gerald Soosai Raj <ad...@gmail.com>
Authored: Sun May 1 15:38:38 2016 -0500
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Wed May 4 23:15:35 2016 -0700
----------------------------------------------------------------------
catalog/CMakeLists.txt | 5 +-
catalog/CatalogRelation.hpp | 9 ++
relational_operators/CMakeLists.txt | 5 ++
relational_operators/SelectOperator.cpp | 129 +++++++++++++++++++++------
relational_operators/SelectOperator.hpp | 115 +++++++++++++++++++++---
5 files changed, 226 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 8c89d7e..94da838 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -175,9 +175,12 @@ target_link_libraries(quickstep_catalog
quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
quickstep_catalog_IndexScheme
- quickstep_catalog_NUMAPlacementScheme
quickstep_catalog_PartitionScheme
quickstep_catalog_PartitionSchemeHeader)
+if(QUICKSTEP_HAVE_LIBNUMA)
+target_link_libraries(quickstep_catalog
+ quickstep_catalog_NUMAPlacementScheme)
+endif()
# Tests:
add_executable(Catalog_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/Catalog_unittest.cpp")
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/catalog/CatalogRelation.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index 4cc8d79..3701090 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -165,6 +165,15 @@ class CatalogRelation : public CatalogRelationSchema {
}
/**
+ * @brief Get the NUMA placement scheme of the relation.
+ *
+ * @return A pointer to a const NUMA placement scheme.
+ **/
+ const NUMAPlacementScheme* getNUMAPlacementSchemePtr() const {
+ return placement_scheme_.get();
+ }
+
+ /**
* @brief Set the NUMA placement scheme for the catalog relation.
*
* @param placement_scheme The NUMA placement scheme object for the relation,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index a4600e6..eec5300 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -266,6 +266,7 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator
glog
quickstep_catalog_CatalogRelation
quickstep_catalog_CatalogTypedefs
+ quickstep_catalog_PartitionSchemeHeader
quickstep_queryexecution_QueryContext
quickstep_queryexecution_WorkOrdersContainer
quickstep_relationaloperators_RelationalOperator
@@ -276,6 +277,10 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator
quickstep_storage_StorageManager
quickstep_utility_Macros
tmb)
+if(QUICKSTEP_HAVE_LIBNUMA)
+target_link_libraries(quickstep_relationaloperators_SelectOperator
+ quickstep_catalog_NUMAPlacementScheme)
+endif()
target_link_libraries(quickstep_relationaloperators_SortMergeRunOperator
glog
quickstep_catalog_CatalogRelation
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index 3cac199..69bb434 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -35,6 +35,93 @@ namespace quickstep {
class Predicate;
+void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
+ StorageManager *storage_manager,
+ const Predicate *predicate,
+ const std::vector<std::unique_ptr<const Scalar>> *selection,
+ InsertDestination *output_destination) {
+ if (input_relation_is_stored_) {
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ container->addNormalWorkOrder(
+ new SelectWorkOrder(input_relation_,
+ input_block_id,
+ predicate,
+ simple_projection_,
+ simple_selection_,
+ selection,
+ output_destination,
+ storage_manager),
+ op_index_);
+ }
+ } else {
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addNormalWorkOrder(
+ new SelectWorkOrder(
+ input_relation_,
+ input_relation_block_ids_[num_workorders_generated_],
+ predicate,
+ simple_projection_,
+ simple_selection_,
+ selection,
+ output_destination,
+ storage_manager),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ }
+}
+
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+ StorageManager *storage_manager,
+ const Predicate *predicate,
+ const std::vector<std::unique_ptr<const Scalar>> *selection,
+ InsertDestination *output_destination) {
+ DCHECK(placement_scheme_ != nullptr);
+ const std::size_t num_partitions = input_relation_.getPartitionScheme().getPartitionSchemeHeader().getNumPartitions();
+ if (input_relation_is_stored_) {
+ for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+ for (const block_id input_block_id :
+ input_relation_block_ids_in_partition_[part_id]) {
+ container->addNormalWorkOrder(
+ new SelectWorkOrder(
+ input_relation_,
+ input_block_id,
+ predicate,
+ simple_projection_,
+ simple_selection_,
+ selection,
+ output_destination,
+ storage_manager,
+ placement_scheme_->getNUMANodeForBlock(input_block_id)),
+ op_index_);
+ }
+ }
+ } else {
+ for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+ while (num_workorders_generated_in_partition_[part_id] <
+ input_relation_block_ids_in_partition_[part_id].size()) {
+ block_id block_in_partition
+ = input_relation_block_ids_in_partition_[part_id][num_workorders_generated_in_partition_[part_id]];
+ container->addNormalWorkOrder(
+ new SelectWorkOrder(
+ input_relation_,
+ block_in_partition,
+ predicate,
+ simple_projection_,
+ simple_selection_,
+ selection,
+ output_destination,
+ storage_manager,
+ placement_scheme_->getNUMANodeForBlock(block_in_partition)),
+ op_index_);
+ ++num_workorders_generated_in_partition_[part_id];
+ }
+ }
+ }
+}
+#endif
+
bool SelectOperator::getAllWorkOrders(
WorkOrdersContainer *container,
QueryContext *query_context,
@@ -54,35 +141,27 @@ bool SelectOperator::getAllWorkOrders(
if (input_relation_is_stored_) {
if (!started_) {
- for (const block_id input_block_id : input_relation_block_ids_) {
- container->addNormalWorkOrder(
- new SelectWorkOrder(input_relation_,
- input_block_id,
- predicate,
- simple_projection_,
- simple_selection_,
- selection,
- output_destination,
- storage_manager),
- op_index_);
+ if (input_relation_.hasPartitionScheme()) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+ if (input_relation_.hasNUMAPlacementScheme()) {
+ addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ }
+#endif
+ } else {
+ addWorkOrders(container, storage_manager, predicate, selection, output_destination);
}
started_ = true;
}
return started_;
} else {
- while (num_workorders_generated_ < input_relation_block_ids_.size()) {
- container->addNormalWorkOrder(
- new SelectWorkOrder(
- input_relation_,
- input_relation_block_ids_[num_workorders_generated_],
- predicate,
- simple_projection_,
- simple_selection_,
- selection,
- output_destination,
- storage_manager),
- op_index_);
- ++num_workorders_generated_;
+ if (input_relation_.hasPartitionScheme()) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+ if (input_relation_.hasNUMAPlacementScheme()) {
+ addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+ }
+#endif
+ } else {
+ addWorkOrders(container, storage_manager, predicate, selection, output_destination);
}
return done_feeding_input_relation_;
}
@@ -90,7 +169,7 @@ bool SelectOperator::getAllWorkOrders(
void SelectWorkOrder::execute() {
BlockReference block(
- storage_manager_->getBlock(input_block_id_, input_relation_));
+ storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));
if (simple_projection_) {
block->selectSimple(simple_selection_,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 3da496c..76f4cb6 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -24,6 +24,12 @@
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
+
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+#include "catalog/NUMAPlacementScheme.hpp"
+#endif
+
+#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
@@ -87,7 +93,28 @@ class SelectOperator : public RelationalOperator {
num_workorders_generated_(0),
simple_projection_(false),
input_relation_is_stored_(input_relation_is_stored),
- started_(false) {}
+ started_(false) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+ placement_scheme_ = input_relation.getNUMAPlacementSchemePtr();
+#endif
+ if (input_relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = input_relation.getPartitionScheme();
+ const PartitionSchemeHeader &part_scheme_header = part_scheme.getPartitionSchemeHeader();
+ const std::size_t num_partitions = part_scheme_header.getNumPartitions();
+ input_relation_block_ids_in_partition_.resize(num_partitions);
+ num_workorders_generated_in_partition_.resize(num_partitions);
+ num_workorders_generated_in_partition_.assign(num_partitions, 0);
+ for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+ if (input_relation_is_stored) {
+ input_relation_block_ids_in_partition_[part_id] =
+ part_scheme.getBlocksInPartition(part_id);
+ } else {
+ input_relation_block_ids_in_partition_[part_id] =
+ std::vector<block_id>();
+ }
+ }
+ }
+ }
/**
* @brief Constructor for selection with simple projection of attributes.
@@ -124,7 +151,28 @@ class SelectOperator : public RelationalOperator {
num_workorders_generated_(0),
simple_projection_(true),
input_relation_is_stored_(input_relation_is_stored),
- started_(false) {}
+ started_(false) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+ placement_scheme_ = input_relation.getNUMAPlacementSchemePtr();
+#endif
+ if (input_relation.hasPartitionScheme()) {
+ const PartitionScheme &part_scheme = input_relation.getPartitionScheme();
+ const PartitionSchemeHeader &part_scheme_header = part_scheme.getPartitionSchemeHeader();
+ const std::size_t num_partitions = part_scheme_header.getNumPartitions();
+ input_relation_block_ids_in_partition_.resize(num_partitions);
+ num_workorders_generated_in_partition_.resize(num_partitions);
+ num_workorders_generated_in_partition_.assign(num_partitions, 0);
+ for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+ if (input_relation_is_stored) {
+ input_relation_block_ids_in_partition_[part_id] =
+ part_scheme.getBlocksInPartition(part_id);
+ } else {
+ input_relation_block_ids_in_partition_[part_id] =
+ std::vector<block_id>();
+ }
+ }
+ }
+ }
~SelectOperator() override {}
@@ -135,13 +183,33 @@ class SelectOperator : public RelationalOperator {
tmb::MessageBus *bus) override;
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
- input_relation_block_ids_.push_back(input_block_id);
+ if (input_relation_.hasPartitionScheme()) {
+ const partition_id part_id =
+ input_relation_.getPartitionScheme().getPartitionForBlock(input_block_id);
+ input_relation_block_ids_in_partition_[part_id].push_back(input_block_id);
+ } else {
+ input_relation_block_ids_.push_back(input_block_id);
+ }
}
+ // TODO(gerald): Each call to getPartitionForBlock() involves grabbing shared
+ // locks on each partition's mutex, checking if the block belongs to the
+ // partition. Instead, we can provide a method getPartitionsForBlocks() which
+ // accepts a list of blocks and returns corresponding list of their partition IDs.
+ // Therefore, once we grab a lock for a partition, we search for all the blocks
+ // and then release the lock.
void feedInputBlocks(const relation_id rel_id, std::vector<block_id> *partially_filled_blocks) override {
- input_relation_block_ids_.insert(input_relation_block_ids_.end(),
- partially_filled_blocks->begin(),
- partially_filled_blocks->end());
+ if (input_relation_.hasPartitionScheme()) {
+ for (auto it = partially_filled_blocks->begin(); it != partially_filled_blocks->end(); ++it) {
+ const partition_id part_id = input_relation_.getPartitionScheme().getPartitionForBlock((*it));
+ input_relation_block_ids_in_partition_[part_id].insert(input_relation_block_ids_in_partition_[part_id].end(),
+ *it);
+ }
+ } else {
+ input_relation_block_ids_.insert(input_relation_block_ids_.end(),
+ partially_filled_blocks->begin(),
+ partially_filled_blocks->end());
+ }
}
QueryContext::insert_destination_id getInsertDestinationID() const override {
@@ -152,9 +220,20 @@ class SelectOperator : public RelationalOperator {
return output_relation_.getID();
}
+ void addWorkOrders(WorkOrdersContainer *container,
+ StorageManager *storage_manager,
+ const Predicate *predicate,
+ const std::vector<std::unique_ptr<const Scalar>> *selection,
+ InsertDestination *output_destination);
+
+ void addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+ StorageManager *storage_manager,
+ const Predicate *predicate,
+ const std::vector<std::unique_ptr<const Scalar>> *selection,
+ InsertDestination *output_destination);
+
private:
const CatalogRelation &input_relation_;
-
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
const QueryContext::predicate_id predicate_index_;
@@ -163,12 +242,20 @@ class SelectOperator : public RelationalOperator {
const std::vector<attribute_id> simple_selection_;
std::vector<block_id> input_relation_block_ids_;
+ // A vector of vectors V where V[i] indicates the list of block IDs of the
+ // input relation that belong to the partition i.
+ std::vector<std::vector<block_id>> input_relation_block_ids_in_partition_;
// A single workorder is generated for each block of input relation.
std::vector<block_id>::size_type num_workorders_generated_;
+ // A single workorder is generated for each block in each partition of input relation.
+ std::vector<std::size_t> num_workorders_generated_in_partition_;
const bool simple_projection_;
const bool input_relation_is_stored_;
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+ const NUMAPlacementScheme *placement_scheme_;
+#endif
bool started_;
DISALLOW_COPY_AND_ASSIGN(SelectOperator);
@@ -205,7 +292,8 @@ class SelectWorkOrder : public WorkOrder {
const std::vector<attribute_id> &simple_selection,
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ const numa_node_id numa_node = 0)
: input_relation_(input_relation),
input_block_id_(input_block_id),
predicate_(predicate),
@@ -213,7 +301,9 @@ class SelectWorkOrder : public WorkOrder {
simple_selection_(simple_selection),
selection_(selection),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ preferred_numa_nodes_.push_back(numa_node);
+ }
/**
* @brief Constructor for the distributed version.
@@ -241,7 +331,8 @@ class SelectWorkOrder : public WorkOrder {
std::vector<attribute_id> &&simple_selection,
const std::vector<std::unique_ptr<const Scalar>> *selection,
InsertDestination *output_destination,
- StorageManager *storage_manager)
+ StorageManager *storage_manager,
+ const numa_node_id numa_node = 0)
: input_relation_(input_relation),
input_block_id_(input_block_id),
predicate_(predicate),
@@ -249,7 +340,9 @@ class SelectWorkOrder : public WorkOrder {
simple_selection_(std::move(simple_selection)),
selection_(selection),
output_destination_(DCHECK_NOTNULL(output_destination)),
- storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+ storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+ preferred_numa_nodes_.push_back(numa_node);
+ }
~SelectWorkOrder() override {}