You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/05/05 06:16:53 UTC

[17/30] incubator-quickstep git commit: Partition aware selection using NUMA-awareness (#175)

Partition aware selection using NUMA-awareness (#175)

Link: https://github.com/pivotalsoftware/quickstep/pull/175

Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/52b758fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/52b758fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/52b758fc

Branch: refs/heads/master
Commit: 52b758fc8f8e4c88cc17b81a398f469ede594069
Parents: 6f4dd8f
Author: Adalbert Gerald Soosai Raj <ad...@gmail.com>
Authored: Sun May 1 15:38:38 2016 -0500
Committer: Zuyu Zhang <zz...@pivotal.io>
Committed: Wed May 4 23:15:35 2016 -0700

----------------------------------------------------------------------
 catalog/CMakeLists.txt                  |   5 +-
 catalog/CatalogRelation.hpp             |   9 ++
 relational_operators/CMakeLists.txt     |   5 ++
 relational_operators/SelectOperator.cpp | 129 +++++++++++++++++++++------
 relational_operators/SelectOperator.hpp | 115 +++++++++++++++++++++---
 5 files changed, 226 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/catalog/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 8c89d7e..94da838 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -175,9 +175,12 @@ target_link_libraries(quickstep_catalog
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_IndexScheme
-                      quickstep_catalog_NUMAPlacementScheme
                       quickstep_catalog_PartitionScheme
                       quickstep_catalog_PartitionSchemeHeader)
+if(QUICKSTEP_HAVE_LIBNUMA)
+target_link_libraries(quickstep_catalog
+                      quickstep_catalog_NUMAPlacementScheme)
+endif()
 
 # Tests:
 add_executable(Catalog_unittest "${CMAKE_CURRENT_SOURCE_DIR}/tests/Catalog_unittest.cpp")

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/catalog/CatalogRelation.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index 4cc8d79..3701090 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -165,6 +165,15 @@ class CatalogRelation : public CatalogRelationSchema {
   }
 
   /**
+   * @brief Get the NUMA placement scheme of the relation.
+   *
+   * @return A pointer to a const NUMA placement scheme.
+   **/
+  const NUMAPlacementScheme* getNUMAPlacementSchemePtr() const {
+    return placement_scheme_.get();
+  }
+
+  /**
    * @brief Set the NUMA placement scheme for the catalog relation.
    *
    * @param placement_scheme The NUMA placement scheme object for the relation,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index a4600e6..eec5300 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -266,6 +266,7 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator
                       glog
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_relationaloperators_RelationalOperator
@@ -276,6 +277,10 @@ target_link_libraries(quickstep_relationaloperators_SelectOperator
                       quickstep_storage_StorageManager
                       quickstep_utility_Macros
                       tmb)
+if(QUICKSTEP_HAVE_LIBNUMA)
+target_link_libraries(quickstep_relationaloperators_SelectOperator
+                      quickstep_catalog_NUMAPlacementScheme)
+endif()
 target_link_libraries(quickstep_relationaloperators_SortMergeRunOperator
                       glog
                       quickstep_catalog_CatalogRelation

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index 3cac199..69bb434 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -35,6 +35,93 @@ namespace quickstep {
 
 class Predicate;
 
+void SelectOperator::addWorkOrders(WorkOrdersContainer *container,
+                                   StorageManager *storage_manager,
+                                   const Predicate *predicate,
+                                   const std::vector<std::unique_ptr<const Scalar>> *selection,
+                                   InsertDestination *output_destination) {
+  if (input_relation_is_stored_) {
+    for (const block_id input_block_id : input_relation_block_ids_) {
+      container->addNormalWorkOrder(
+          new SelectWorkOrder(input_relation_,
+                              input_block_id,
+                              predicate,
+                              simple_projection_,
+                              simple_selection_,
+                              selection,
+                              output_destination,
+                              storage_manager),
+          op_index_);
+    }
+  } else {
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addNormalWorkOrder(
+          new SelectWorkOrder(
+              input_relation_,
+              input_relation_block_ids_[num_workorders_generated_],
+              predicate,
+              simple_projection_,
+              simple_selection_,
+              selection,
+              output_destination,
+              storage_manager),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+  }
+}
+
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+void SelectOperator::addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+                                                 StorageManager *storage_manager,
+                                                 const Predicate *predicate,
+                                                 const std::vector<std::unique_ptr<const Scalar>> *selection,
+                                                 InsertDestination *output_destination) {
+  DCHECK(placement_scheme_ != nullptr);
+  const std::size_t num_partitions = input_relation_.getPartitionScheme().getPartitionSchemeHeader().getNumPartitions();
+  if (input_relation_is_stored_) {
+    for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+      for (const block_id input_block_id :
+           input_relation_block_ids_in_partition_[part_id]) {
+        container->addNormalWorkOrder(
+            new SelectWorkOrder(
+                input_relation_,
+                input_block_id,
+                predicate,
+                simple_projection_,
+                simple_selection_,
+                selection,
+                output_destination,
+                storage_manager,
+                placement_scheme_->getNUMANodeForBlock(input_block_id)),
+            op_index_);
+      }
+    }
+  } else {
+    for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+      while (num_workorders_generated_in_partition_[part_id] <
+             input_relation_block_ids_in_partition_[part_id].size()) {
+        block_id block_in_partition
+            = input_relation_block_ids_in_partition_[part_id][num_workorders_generated_in_partition_[part_id]];
+        container->addNormalWorkOrder(
+            new SelectWorkOrder(
+                input_relation_,
+                block_in_partition,
+                predicate,
+                simple_projection_,
+                simple_selection_,
+                selection,
+                output_destination,
+                storage_manager,
+                placement_scheme_->getNUMANodeForBlock(block_in_partition)),
+            op_index_);
+        ++num_workorders_generated_in_partition_[part_id];
+      }
+    }
+  }
+}
+#endif
+
 bool SelectOperator::getAllWorkOrders(
     WorkOrdersContainer *container,
     QueryContext *query_context,
@@ -54,35 +141,27 @@ bool SelectOperator::getAllWorkOrders(
 
   if (input_relation_is_stored_) {
     if (!started_) {
-      for (const block_id input_block_id : input_relation_block_ids_) {
-        container->addNormalWorkOrder(
-            new SelectWorkOrder(input_relation_,
-                                input_block_id,
-                                predicate,
-                                simple_projection_,
-                                simple_selection_,
-                                selection,
-                                output_destination,
-                                storage_manager),
-            op_index_);
+      if (input_relation_.hasPartitionScheme()) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+        if (input_relation_.hasNUMAPlacementScheme()) {
+          addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+        }
+#endif
+      } else {
+        addWorkOrders(container, storage_manager, predicate, selection, output_destination);
       }
       started_ = true;
     }
     return started_;
   } else {
-    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
-      container->addNormalWorkOrder(
-          new SelectWorkOrder(
-              input_relation_,
-              input_relation_block_ids_[num_workorders_generated_],
-              predicate,
-              simple_projection_,
-              simple_selection_,
-              selection,
-              output_destination,
-              storage_manager),
-          op_index_);
-      ++num_workorders_generated_;
+    if (input_relation_.hasPartitionScheme()) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+        if (input_relation_.hasNUMAPlacementScheme()) {
+          addPartitionAwareWorkOrders(container, storage_manager, predicate, selection, output_destination);
+        }
+#endif
+    } else {
+        addWorkOrders(container, storage_manager, predicate, selection, output_destination);
     }
     return done_feeding_input_relation_;
   }
@@ -90,7 +169,7 @@ bool SelectOperator::getAllWorkOrders(
 
 void SelectWorkOrder::execute() {
   BlockReference block(
-      storage_manager_->getBlock(input_block_id_, input_relation_));
+      storage_manager_->getBlock(input_block_id_, input_relation_, getPreferredNUMANodes()[0]));
 
   if (simple_projection_) {
     block->selectSimple(simple_selection_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/52b758fc/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 3da496c..76f4cb6 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -24,6 +24,12 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+#include "catalog/NUMAPlacementScheme.hpp"
+#endif
+
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -87,7 +93,28 @@ class SelectOperator : public RelationalOperator {
         num_workorders_generated_(0),
         simple_projection_(false),
         input_relation_is_stored_(input_relation_is_stored),
-        started_(false) {}
+        started_(false) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+    placement_scheme_ = input_relation.getNUMAPlacementSchemePtr();
+#endif
+    if (input_relation.hasPartitionScheme()) {
+      const PartitionScheme &part_scheme = input_relation.getPartitionScheme();
+      const PartitionSchemeHeader &part_scheme_header = part_scheme.getPartitionSchemeHeader();
+      const std::size_t num_partitions = part_scheme_header.getNumPartitions();
+      input_relation_block_ids_in_partition_.resize(num_partitions);
+      num_workorders_generated_in_partition_.resize(num_partitions);
+      num_workorders_generated_in_partition_.assign(num_partitions, 0);
+      for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+        if (input_relation_is_stored) {
+          input_relation_block_ids_in_partition_[part_id] =
+              part_scheme.getBlocksInPartition(part_id);
+        } else {
+          input_relation_block_ids_in_partition_[part_id] =
+              std::vector<block_id>();
+        }
+      }
+    }
+  }
 
   /**
    * @brief Constructor for selection with simple projection of attributes.
@@ -124,7 +151,28 @@ class SelectOperator : public RelationalOperator {
         num_workorders_generated_(0),
         simple_projection_(true),
         input_relation_is_stored_(input_relation_is_stored),
-        started_(false) {}
+        started_(false) {
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+    placement_scheme_ = input_relation.getNUMAPlacementSchemePtr();
+#endif
+    if (input_relation.hasPartitionScheme()) {
+      const PartitionScheme &part_scheme = input_relation.getPartitionScheme();
+      const PartitionSchemeHeader &part_scheme_header = part_scheme.getPartitionSchemeHeader();
+      const std::size_t num_partitions = part_scheme_header.getNumPartitions();
+      input_relation_block_ids_in_partition_.resize(num_partitions);
+      num_workorders_generated_in_partition_.resize(num_partitions);
+      num_workorders_generated_in_partition_.assign(num_partitions, 0);
+      for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
+        if (input_relation_is_stored) {
+          input_relation_block_ids_in_partition_[part_id] =
+              part_scheme.getBlocksInPartition(part_id);
+        } else {
+          input_relation_block_ids_in_partition_[part_id] =
+              std::vector<block_id>();
+        }
+      }
+    }
+  }
 
   ~SelectOperator() override {}
 
@@ -135,13 +183,33 @@ class SelectOperator : public RelationalOperator {
                         tmb::MessageBus *bus) override;
 
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
-    input_relation_block_ids_.push_back(input_block_id);
+    if (input_relation_.hasPartitionScheme()) {
+      const partition_id part_id =
+          input_relation_.getPartitionScheme().getPartitionForBlock(input_block_id);
+      input_relation_block_ids_in_partition_[part_id].push_back(input_block_id);
+    } else {
+      input_relation_block_ids_.push_back(input_block_id);
+    }
   }
 
+  // TODO(gerald): Each call to getPartitionForBlock() involves grabbing shared
+  // locks on each partition's mutex, checking if the block belongs to the
+  // partition. Instead, we can provide a method getPartitionsForBlocks() which
+  // accepts a list of blocks and returns corresponding list of their partition IDs.
+  // Therefore, once we grab a lock for a partition, we search for all the blocks
+  // and then release the lock.
   void feedInputBlocks(const relation_id rel_id, std::vector<block_id> *partially_filled_blocks) override {
-    input_relation_block_ids_.insert(input_relation_block_ids_.end(),
-                                     partially_filled_blocks->begin(),
-                                     partially_filled_blocks->end());
+    if (input_relation_.hasPartitionScheme()) {
+      for (auto it = partially_filled_blocks->begin(); it != partially_filled_blocks->end(); ++it) {
+        const partition_id part_id = input_relation_.getPartitionScheme().getPartitionForBlock((*it));
+        input_relation_block_ids_in_partition_[part_id].insert(input_relation_block_ids_in_partition_[part_id].end(),
+                                                               *it);
+      }
+    } else {
+      input_relation_block_ids_.insert(input_relation_block_ids_.end(),
+                                       partially_filled_blocks->begin(),
+                                       partially_filled_blocks->end());
+    }
   }
 
   QueryContext::insert_destination_id getInsertDestinationID() const override {
@@ -152,9 +220,20 @@ class SelectOperator : public RelationalOperator {
     return output_relation_.getID();
   }
 
+  void addWorkOrders(WorkOrdersContainer *container,
+                     StorageManager *storage_manager,
+                     const Predicate *predicate,
+                     const std::vector<std::unique_ptr<const Scalar>> *selection,
+                     InsertDestination *output_destination);
+
+  void addPartitionAwareWorkOrders(WorkOrdersContainer *container,
+                                   StorageManager *storage_manager,
+                                   const Predicate *predicate,
+                                   const std::vector<std::unique_ptr<const Scalar>> *selection,
+                                   InsertDestination *output_destination);
+
  private:
   const CatalogRelation &input_relation_;
-
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::predicate_id predicate_index_;
@@ -163,12 +242,20 @@ class SelectOperator : public RelationalOperator {
   const std::vector<attribute_id> simple_selection_;
 
   std::vector<block_id> input_relation_block_ids_;
+  // A vector of vectors V where V[i] indicates the list of block IDs of the
+  // input relation that belong to the partition i.
+  std::vector<std::vector<block_id>> input_relation_block_ids_in_partition_;
 
   // A single workorder is generated for each block of input relation.
   std::vector<block_id>::size_type num_workorders_generated_;
+  // A single workorder is generated for each block in each partition of input relation.
+  std::vector<std::size_t> num_workorders_generated_in_partition_;
 
   const bool simple_projection_;
   const bool input_relation_is_stored_;
+#ifdef QUICKSTEP_HAVE_LIBNUMA
+  const NUMAPlacementScheme *placement_scheme_;
+#endif
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(SelectOperator);
@@ -205,7 +292,8 @@ class SelectWorkOrder : public WorkOrder {
                   const std::vector<attribute_id> &simple_selection,
                   const std::vector<std::unique_ptr<const Scalar>> *selection,
                   InsertDestination *output_destination,
-                  StorageManager *storage_manager)
+                  StorageManager *storage_manager,
+                  const numa_node_id numa_node = 0)
       : input_relation_(input_relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
@@ -213,7 +301,9 @@ class SelectWorkOrder : public WorkOrder {
         simple_selection_(simple_selection),
         selection_(selection),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    preferred_numa_nodes_.push_back(numa_node);
+  }
 
   /**
    * @brief Constructor for the distributed version.
@@ -241,7 +331,8 @@ class SelectWorkOrder : public WorkOrder {
                   std::vector<attribute_id> &&simple_selection,
                   const std::vector<std::unique_ptr<const Scalar>> *selection,
                   InsertDestination *output_destination,
-                  StorageManager *storage_manager)
+                  StorageManager *storage_manager,
+                  const numa_node_id numa_node = 0)
       : input_relation_(input_relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
@@ -249,7 +340,9 @@ class SelectWorkOrder : public WorkOrder {
         simple_selection_(std::move(simple_selection)),
         selection_(selection),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {
+    preferred_numa_nodes_.push_back(numa_node);
+  }
 
   ~SelectWorkOrder() override {}