You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/04/25 21:22:11 UTC

incubator-quickstep git commit: Assigned partition_id to RebuildWorkOrder.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master 30021acf8 -> 5ee3f11a5


Assigned partition_id to RebuildWorkOrder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ee3f11a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ee3f11a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ee3f11a

Branch: refs/heads/master
Commit: 5ee3f11a5f36d56b27df0e7df3a270852911d7a0
Parents: 30021ac
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Apr 22 21:09:19 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Apr 24 17:57:41 2017 -0700

----------------------------------------------------------------------
 query_execution/QueryManagerSingleNode.cpp |  4 +-
 query_execution/Shiftboss.cpp              |  4 +-
 relational_operators/RebuildWorkOrder.hpp  |  5 +++
 storage/InsertDestination.cpp              |  7 ++-
 storage/InsertDestination.hpp              | 58 +++++++++++++++----------
 5 files changed, 51 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index e3f349f..79c4026 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -174,12 +174,13 @@ void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
   }
 
   std::vector<MutableBlockReference> partially_filled_block_refs;
+  std::vector<partition_id> part_ids;
 
   DCHECK(query_context_ != nullptr);
   InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
   DCHECK(insert_destination != nullptr);
 
-  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs, &part_ids);
 
   for (std::vector<MutableBlockReference>::size_type i = 0;
        i < partially_filled_block_refs.size();
@@ -189,6 +190,7 @@ void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
                              std::move(partially_filled_block_refs[i]),
                              index,
                              op.getOutputRelationID(),
+                             part_ids[i],
                              foreman_client_id_,
                              bus_),
         index);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 72d8913..21e7858 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -405,7 +405,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
   DCHECK(insert_destination != nullptr);
 
   vector<MutableBlockReference> partially_filled_block_refs;
-  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+  vector<partition_id> part_ids;
+  insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs, &part_ids);
 
   serialization::InitiateRebuildResponseMessage proto;
   proto.set_query_id(query_id);
@@ -439,6 +440,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
                              move(partially_filled_block_refs[i]),
                              op_index,
                              rel_id,
+                             part_ids[i],
                              shiftboss_client_id_local_,
                              bus_local_);
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 7f0f7fc..8615d74 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -57,6 +57,7 @@ class RebuildWorkOrder : public WorkOrder {
    *        query plan DAG that produced the output block.
    * @param input_relation_id The ID of the CatalogRelation to which the given
    *        storage block belongs to.
+   * @param part_id The partition_id of the block, if any.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
@@ -65,12 +66,14 @@ class RebuildWorkOrder : public WorkOrder {
       MutableBlockReference &&block_ref,  // NOLINT(whitespace/operators)
       const std::size_t input_operator_index,
       const relation_id input_relation_id,
+      const partition_id part_id,
       const client_id scheduler_client_id,
       MessageBus *bus)
       : WorkOrder(query_id),
         block_ref_(std::move(block_ref)),
         input_operator_index_(input_operator_index),
         input_relation_id_(input_relation_id),
+        part_id_(part_id),
         scheduler_client_id_(scheduler_client_id),
         bus_(bus) {}
 
@@ -88,6 +91,7 @@ class RebuildWorkOrder : public WorkOrder {
     proto.set_block_id(block_ref_->getID());
     proto.set_relation_id(input_relation_id_);
     proto.set_query_id(query_id_);
+    proto.set_partition_id(part_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -114,6 +118,7 @@ class RebuildWorkOrder : public WorkOrder {
   MutableBlockReference block_ref_;
   const std::size_t input_operator_index_;
   const relation_id input_relation_id_;
+  const partition_id part_id_;
   const client_id scheduler_client_id_;
 
   MessageBus *bus_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index b6a9e3a..891b5a1 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -29,6 +29,7 @@
 #include "catalog/Catalog.pb.h"
 #include "catalog/CatalogAttribute.hpp"
 #include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "catalog/PartitionSchemeHeader.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionTypedefs.hpp"
@@ -421,11 +422,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
   return storage_manager_->getBlockMutable(new_id, relation_);
 }
 
-void BlockPoolInsertDestination::getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) {
+void BlockPoolInsertDestination::getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+                                                          vector<partition_id> *part_ids) {
   SpinMutexLock lock(mutex_);
   for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_.size(); ++i) {
     partial_blocks->push_back((std::move(available_block_refs_[i])));
+    // No partition.
+    part_ids->push_back(0u);
   }
+
   available_block_refs_.clear();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index dc5a093..66c67d7 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -183,8 +183,11 @@ class InsertDestination : public InsertDestinationInterface {
    *
    * @param partial_blocks A pointer to the vector of block IDs in which the
    *                       partially filled block IDs will be added.
+   * @param part_ids A pointer to the vector of partiiton_ids in which the
+   *                 partially filled block IDs are associated with.
    **/
-  virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) = 0;
+  virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+                                        std::vector<partition_id> *part_ids) = 0;
 
  protected:
   /**
@@ -335,7 +338,8 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
     return returned_block_ids_;
   }
 
-  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override {
+  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+                                std::vector<partition_id> *part_ids) override {
   }
 
  private:
@@ -421,7 +425,8 @@ class BlockPoolInsertDestination : public InsertDestination {
 
   void returnBlock(MutableBlockReference &&block, const bool full) override;
 
-  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override;
+  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+                                std::vector<partition_id> *part_ids) override;
 
   const std::vector<block_id>& getTouchedBlocksInternal() override;
 
@@ -488,33 +493,15 @@ class PartitionAwareInsertDestination : public InsertDestination {
     available_block_ids_[part_id].push_back(bid);
   }
 
-  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override {
+  void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+                                std::vector<partition_id> *part_ids) override {
     // Iterate through each partition and return the partially filled blocks
     // in each partition.
     for (partition_id part_id = 0; part_id < partition_scheme_header_->getNumPartitions(); ++part_id) {
-      getPartiallyFilledBlocksInPartition(partial_blocks, part_id);
+      getPartiallyFilledBlocksInPartition(partial_blocks, part_ids, part_id);
     }
   }
 
-  /**
-   * @brief Get the set of blocks that were partially filled by clients of this
-   *        InsertDestination for insertion.
-   * @warning Should only be called AFTER this InsertDestination will no longer
-   *          be used, and all blocks have been returned to it via
-   *          returnBlock() and BEFORE getTouchedBlocks() is called, at all.
-   *
-   * @param partial_blocks A pointer to the vector of block IDs in which the
-   *                       partially filled block IDs will be added.
-   * @param part_id The partition id for which we want the partially filled blocks.
-   **/
-  void getPartiallyFilledBlocksInPartition(std::vector<MutableBlockReference> *partial_blocks, partition_id part_id) {
-    SpinMutexLock lock(mutexes_for_partition_[part_id]);
-    for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_[part_id].size(); ++i) {
-      partial_blocks->push_back((std::move(available_block_refs_[part_id][i])));
-    }
-    available_block_refs_[part_id].clear();
-  }
-
   PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const override;
 
   void insertTuple(const Tuple &tuple) override;
@@ -572,6 +559,29 @@ class PartitionAwareInsertDestination : public InsertDestination {
   const std::vector<block_id>& getTouchedBlocksInternalInPartition(partition_id part_id);
 
  private:
+  /**
+   * @brief Get the set of blocks that were partially filled by clients of this
+   *        InsertDestination for insertion.
+   * @warning Should only be called AFTER this InsertDestination will no longer
+   *          be used, and all blocks have been returned to it via
+   *          returnBlock() and BEFORE getTouchedBlocks() is called, at all.
+   *
+   * @param partial_blocks A pointer to the vector of block IDs in which the
+   *                       partially filled block IDs will be added.
+   * @param part_id The partition id for which we want the partially filled blocks.
+   **/
+  void getPartiallyFilledBlocksInPartition(std::vector<MutableBlockReference> *partial_blocks,
+                                           std::vector<partition_id> *part_ids,
+                                           const partition_id part_id) {
+    SpinMutexLock lock(mutexes_for_partition_[part_id]);
+    for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_[part_id].size(); ++i) {
+      partial_blocks->push_back((std::move(available_block_refs_[part_id][i])));
+      part_ids->push_back(part_id);
+    }
+
+    available_block_refs_[part_id].clear();
+  }
+
   std::unique_ptr<const PartitionSchemeHeader> partition_scheme_header_;
 
   // A vector of available block references for each partition.