You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/04/25 21:22:11 UTC
incubator-quickstep git commit: Assigned partition_id to
RebuildWorkOrder.
Repository: incubator-quickstep
Updated Branches:
refs/heads/master 30021acf8 -> 5ee3f11a5
Assigned partition_id to RebuildWorkOrder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/5ee3f11a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/5ee3f11a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/5ee3f11a
Branch: refs/heads/master
Commit: 5ee3f11a5f36d56b27df0e7df3a270852911d7a0
Parents: 30021ac
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sat Apr 22 21:09:19 2017 -0700
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Apr 24 17:57:41 2017 -0700
----------------------------------------------------------------------
query_execution/QueryManagerSingleNode.cpp | 4 +-
query_execution/Shiftboss.cpp | 4 +-
relational_operators/RebuildWorkOrder.hpp | 5 +++
storage/InsertDestination.cpp | 7 ++-
storage/InsertDestination.hpp | 58 +++++++++++++++----------
5 files changed, 51 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/query_execution/QueryManagerSingleNode.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index e3f349f..79c4026 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -174,12 +174,13 @@ void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
}
std::vector<MutableBlockReference> partially_filled_block_refs;
+ std::vector<partition_id> part_ids;
DCHECK(query_context_ != nullptr);
InsertDestination *insert_destination = query_context_->getInsertDestination(insert_destination_index);
DCHECK(insert_destination != nullptr);
- insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+ insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs, &part_ids);
for (std::vector<MutableBlockReference>::size_type i = 0;
i < partially_filled_block_refs.size();
@@ -189,6 +190,7 @@ void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
std::move(partially_filled_block_refs[i]),
index,
op.getOutputRelationID(),
+ part_ids[i],
foreman_client_id_,
bus_),
index);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/query_execution/Shiftboss.cpp
----------------------------------------------------------------------
diff --git a/query_execution/Shiftboss.cpp b/query_execution/Shiftboss.cpp
index 72d8913..21e7858 100644
--- a/query_execution/Shiftboss.cpp
+++ b/query_execution/Shiftboss.cpp
@@ -405,7 +405,8 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
DCHECK(insert_destination != nullptr);
vector<MutableBlockReference> partially_filled_block_refs;
- insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs);
+ vector<partition_id> part_ids;
+ insert_destination->getPartiallyFilledBlocks(&partially_filled_block_refs, &part_ids);
serialization::InitiateRebuildResponseMessage proto;
proto.set_query_id(query_id);
@@ -439,6 +440,7 @@ void Shiftboss::processInitiateRebuildMessage(const std::size_t query_id,
move(partially_filled_block_refs[i]),
op_index,
rel_id,
+ part_ids[i],
shiftboss_client_id_local_,
bus_local_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 7f0f7fc..8615d74 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -57,6 +57,7 @@ class RebuildWorkOrder : public WorkOrder {
* query plan DAG that produced the output block.
* @param input_relation_id The ID of the CatalogRelation to which the given
* storage block belongs to.
+ * @param part_id The partition_id of the block, if any.
* @param scheduler_client_id The TMB client ID of the scheduler thread.
* @param bus A pointer to the TMB.
**/
@@ -65,12 +66,14 @@ class RebuildWorkOrder : public WorkOrder {
MutableBlockReference &&block_ref, // NOLINT(whitespace/operators)
const std::size_t input_operator_index,
const relation_id input_relation_id,
+ const partition_id part_id,
const client_id scheduler_client_id,
MessageBus *bus)
: WorkOrder(query_id),
block_ref_(std::move(block_ref)),
input_operator_index_(input_operator_index),
input_relation_id_(input_relation_id),
+ part_id_(part_id),
scheduler_client_id_(scheduler_client_id),
bus_(bus) {}
@@ -88,6 +91,7 @@ class RebuildWorkOrder : public WorkOrder {
proto.set_block_id(block_ref_->getID());
proto.set_relation_id(input_relation_id_);
proto.set_query_id(query_id_);
+ proto.set_partition_id(part_id_);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();
@@ -114,6 +118,7 @@ class RebuildWorkOrder : public WorkOrder {
MutableBlockReference block_ref_;
const std::size_t input_operator_index_;
const relation_id input_relation_id_;
+ const partition_id part_id_;
const client_id scheduler_client_id_;
MessageBus *bus_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index b6a9e3a..891b5a1 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -29,6 +29,7 @@
#include "catalog/Catalog.pb.h"
#include "catalog/CatalogAttribute.hpp"
#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionTypedefs.hpp"
@@ -421,11 +422,15 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
return storage_manager_->getBlockMutable(new_id, relation_);
}
-void BlockPoolInsertDestination::getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) {
+void BlockPoolInsertDestination::getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+ vector<partition_id> *part_ids) {
SpinMutexLock lock(mutex_);
for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_.size(); ++i) {
partial_blocks->push_back((std::move(available_block_refs_[i])));
+ // No partition.
+ part_ids->push_back(0u);
}
+
available_block_refs_.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/5ee3f11a/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index dc5a093..66c67d7 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -183,8 +183,11 @@ class InsertDestination : public InsertDestinationInterface {
*
* @param partial_blocks A pointer to the vector of block IDs in which the
* partially filled block IDs will be added.
+ * @param part_ids A pointer to the vector of partiiton_ids in which the
+ * partially filled block IDs are associated with.
**/
- virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) = 0;
+ virtual void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+ std::vector<partition_id> *part_ids) = 0;
protected:
/**
@@ -335,7 +338,8 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
return returned_block_ids_;
}
- void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override {
+ void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+ std::vector<partition_id> *part_ids) override {
}
private:
@@ -421,7 +425,8 @@ class BlockPoolInsertDestination : public InsertDestination {
void returnBlock(MutableBlockReference &&block, const bool full) override;
- void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override;
+ void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+ std::vector<partition_id> *part_ids) override;
const std::vector<block_id>& getTouchedBlocksInternal() override;
@@ -488,33 +493,15 @@ class PartitionAwareInsertDestination : public InsertDestination {
available_block_ids_[part_id].push_back(bid);
}
- void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) override {
+ void getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks,
+ std::vector<partition_id> *part_ids) override {
// Iterate through each partition and return the partially filled blocks
// in each partition.
for (partition_id part_id = 0; part_id < partition_scheme_header_->getNumPartitions(); ++part_id) {
- getPartiallyFilledBlocksInPartition(partial_blocks, part_id);
+ getPartiallyFilledBlocksInPartition(partial_blocks, part_ids, part_id);
}
}
- /**
- * @brief Get the set of blocks that were partially filled by clients of this
- * InsertDestination for insertion.
- * @warning Should only be called AFTER this InsertDestination will no longer
- * be used, and all blocks have been returned to it via
- * returnBlock() and BEFORE getTouchedBlocks() is called, at all.
- *
- * @param partial_blocks A pointer to the vector of block IDs in which the
- * partially filled block IDs will be added.
- * @param part_id The partition id for which we want the partially filled blocks.
- **/
- void getPartiallyFilledBlocksInPartition(std::vector<MutableBlockReference> *partial_blocks, partition_id part_id) {
- SpinMutexLock lock(mutexes_for_partition_[part_id]);
- for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_[part_id].size(); ++i) {
- partial_blocks->push_back((std::move(available_block_refs_[part_id][i])));
- }
- available_block_refs_[part_id].clear();
- }
-
PartitionSchemeHeader::PartitionAttributeIds getPartitioningAttributes() const override;
void insertTuple(const Tuple &tuple) override;
@@ -572,6 +559,29 @@ class PartitionAwareInsertDestination : public InsertDestination {
const std::vector<block_id>& getTouchedBlocksInternalInPartition(partition_id part_id);
private:
+ /**
+ * @brief Get the set of blocks that were partially filled by clients of this
+ * InsertDestination for insertion.
+ * @warning Should only be called AFTER this InsertDestination will no longer
+ * be used, and all blocks have been returned to it via
+ * returnBlock() and BEFORE getTouchedBlocks() is called, at all.
+ *
+ * @param partial_blocks A pointer to the vector of block IDs in which the
+ * partially filled block IDs will be added.
+ * @param part_id The partition id for which we want the partially filled blocks.
+ **/
+ void getPartiallyFilledBlocksInPartition(std::vector<MutableBlockReference> *partial_blocks,
+ std::vector<partition_id> *part_ids,
+ const partition_id part_id) {
+ SpinMutexLock lock(mutexes_for_partition_[part_id]);
+ for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_[part_id].size(); ++i) {
+ partial_blocks->push_back((std::move(available_block_refs_[part_id][i])));
+ part_ids->push_back(part_id);
+ }
+
+ available_block_refs_[part_id].clear();
+ }
+
std::unique_ptr<const PartitionSchemeHeader> partition_scheme_header_;
// A vector of available block references for each partition.