You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/17 21:56:22 UTC
[1/2] incubator-quickstep git commit:
CatalogRelation::getPartitionScheme returns a pointer instead of ref. [Forced
Update!]
Repository: incubator-quickstep
Updated Branches:
refs/heads/feedInputBlock-part-id b553e0115 -> 658b2924c (forced update)
CatalogRelation::getPartitionScheme returns a pointer instead of ref.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/e26cc1e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/e26cc1e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/e26cc1e0
Branch: refs/heads/feedInputBlock-part-id
Commit: e26cc1e037a7a5d07664b99ac2c178b0e8e8bc66
Parents: b0e5968
Author: Zuyu Zhang <zu...@apache.org>
Authored: Tue Jan 17 13:55:24 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Jan 17 13:55:24 2017 -0800
----------------------------------------------------------------------
catalog/CatalogRelation.hpp | 7 +++----
relational_operators/SelectOperator.cpp | 4 ++--
relational_operators/SelectOperator.hpp | 6 +++---
storage/PreloaderThread.cpp | 2 +-
4 files changed, 9 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e26cc1e0/catalog/CatalogRelation.hpp
----------------------------------------------------------------------
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index c38e526..e1fd79a 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -111,12 +111,11 @@ class CatalogRelation : public CatalogRelationSchema {
/**
* @brief Get the partition scheme of the catalog relation.
- * @warning This is only safe if hasPartitionScheme() is true.
*
- * @return A const reference to the partition scheme of the relation.
+ * @return A const pointer to the partition scheme of the relation.
**/
- const PartitionScheme& getPartitionScheme() const {
- return *partition_scheme_;
+ const PartitionScheme* getPartitionScheme() const {
+ return partition_scheme_.get();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e26cc1e0/relational_operators/SelectOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.cpp b/relational_operators/SelectOperator.cpp
index f5c9ee9..d2759c4 100644
--- a/relational_operators/SelectOperator.cpp
+++ b/relational_operators/SelectOperator.cpp
@@ -68,7 +68,7 @@ bool SelectOperator::getAllWorkOrders(
if (input_relation_.hasPartitionScheme()) {
const std::size_t num_partitions =
- input_relation_.getPartitionScheme().getPartitionSchemeHeader().getNumPartitions();
+ input_relation_.getPartitionScheme()->getPartitionSchemeHeader().getNumPartitions();
for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
for (const block_id input_block_id : input_relation_block_ids_in_partition_[part_id]) {
@@ -99,7 +99,7 @@ bool SelectOperator::getAllWorkOrders(
} else {
if (input_relation_.hasPartitionScheme()) {
const std::size_t num_partitions =
- input_relation_.getPartitionScheme().getPartitionSchemeHeader().getNumPartitions();
+ input_relation_.getPartitionScheme()->getPartitionSchemeHeader().getNumPartitions();
for (std::size_t part_id = 0; part_id < num_partitions; ++part_id) {
while (num_workorders_generated_in_partition_[part_id] <
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e26cc1e0/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 5846eda..989eaac 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -111,7 +111,7 @@ class SelectOperator : public RelationalOperator {
placement_scheme_ = input_relation.getNUMAPlacementSchemePtr();
#endif
if (input_relation.hasPartitionScheme()) {
- const PartitionScheme &part_scheme = input_relation.getPartitionScheme();
+ const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
const PartitionSchemeHeader &part_scheme_header = part_scheme.getPartitionSchemeHeader();
const std::size_t num_partitions = part_scheme_header.getNumPartitions();
input_relation_block_ids_in_partition_.resize(num_partitions);
@@ -174,7 +174,7 @@ class SelectOperator : public RelationalOperator {
placement_scheme_ = input_relation.getNUMAPlacementSchemePtr();
#endif
if (input_relation.hasPartitionScheme()) {
- const PartitionScheme &part_scheme = input_relation.getPartitionScheme();
+ const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
const PartitionSchemeHeader &part_scheme_header = part_scheme.getPartitionSchemeHeader();
const std::size_t num_partitions = part_scheme_header.getNumPartitions();
input_relation_block_ids_in_partition_.resize(num_partitions);
@@ -213,7 +213,7 @@ class SelectOperator : public RelationalOperator {
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
if (input_relation_.hasPartitionScheme()) {
const partition_id part_id =
- input_relation_.getPartitionScheme().getPartitionForBlock(input_block_id);
+ input_relation_.getPartitionScheme()->getPartitionForBlock(input_block_id);
input_relation_block_ids_in_partition_[part_id].push_back(input_block_id);
} else {
input_relation_block_ids_.push_back(input_block_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/e26cc1e0/storage/PreloaderThread.cpp
----------------------------------------------------------------------
diff --git a/storage/PreloaderThread.cpp b/storage/PreloaderThread.cpp
index 7f97a05..edaa143 100644
--- a/storage/PreloaderThread.cpp
+++ b/storage/PreloaderThread.cpp
@@ -93,7 +93,7 @@ std::size_t PreloaderThread::preloadNUMAAware(
relation.getNUMAPlacementSchemePtr();
DCHECK(placement_scheme != nullptr);
DCHECK(relation.hasPartitionScheme());
- const PartitionScheme &part_scheme = relation.getPartitionScheme();
+ const PartitionScheme &part_scheme = *relation.getPartitionScheme();
const PartitionSchemeHeader &part_scheme_header =
part_scheme.getPartitionSchemeHeader();
const std::size_t num_partitions = part_scheme_header.getNumPartitions();
[2/2] incubator-quickstep git commit: Added partition_id in
feedInputBlock.
Posted by zu...@apache.org.
Added partition_id in feedInputBlock.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/658b2924
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/658b2924
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/658b2924
Branch: refs/heads/feedInputBlock-part-id
Commit: 658b2924c97ca3adbaa63c85153ee1768b851b78
Parents: e26cc1e
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Jan 15 19:53:54 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Tue Jan 17 13:56:26 2017 -0800
----------------------------------------------------------------------
query_execution/PolicyEnforcerBase.cpp | 2 +-
query_execution/QueryExecutionMessages.proto | 3 +++
query_execution/QueryManagerBase.cpp | 5 +++--
query_execution/QueryManagerBase.hpp | 5 ++++-
query_execution/tests/QueryManagerSingleNode_unittest.cpp | 4 ++--
relational_operators/AggregationOperator.hpp | 3 ++-
relational_operators/BuildHashOperator.hpp | 4 ++--
relational_operators/DeleteOperator.hpp | 3 ++-
relational_operators/HashJoinOperator.hpp | 4 ++--
relational_operators/NestedLoopsJoinOperator.hpp | 3 ++-
relational_operators/RelationalOperator.hpp | 6 +++---
relational_operators/SampleOperator.hpp | 3 ++-
relational_operators/SaveBlocksOperator.hpp | 3 ++-
relational_operators/SelectOperator.hpp | 5 ++---
relational_operators/SortMergeRunOperator.hpp | 4 ++--
relational_operators/SortRunGenerationOperator.hpp | 3 ++-
relational_operators/tests/SortMergeRunOperator_unittest.cpp | 2 +-
storage/InsertDestination.cpp | 2 +-
storage/InsertDestination.hpp | 4 +++-
19 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 1a2ab46..a26b84e 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -118,7 +118,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
op_index = proto.operator_index();
admitted_queries_[query_id]->processDataPipelineMessage(
- op_index, proto.block_id(), proto.relation_id());
+ op_index, proto.block_id(), proto.relation_id(), proto.partition_id());
break;
}
case kWorkOrderFeedbackMessage: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 28b5ebd..115a9a3 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -65,6 +65,9 @@ message DataPipelineMessage {
required fixed64 block_id = 2;
required int32 relation_id = 3;
required uint64 query_id = 4;
+
+ // Used by PartitionAwareInsertDestination.
+ optional uint64 partition_id = 5 [default = 0];
}
// Distributed version related messages.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 8e37da8..5f8c6a3 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -192,13 +192,14 @@ void QueryManagerBase::processOperator(const dag_node_index index,
void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
- const relation_id rel_id) {
+ const relation_id rel_id,
+ const partition_id part_id) {
for (const dag_node_index consumer_index :
output_consumers_[op_index]) {
// Feed the streamed block to the consumer. Note that 'output_consumers_'
// only contain those dependents of operator with index = op_index which are
// eligible to receive streamed input.
- query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
+ query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id);
// Because of the streamed input just fed, check if there are any new
// WorkOrders available and if so, fetch them.
fetchNormalWorkOrders(consumer_index);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index ddb76d5..d0bb0ea 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -109,10 +109,13 @@ class QueryManagerBase {
* for the pipelining block.
* @param block The block id.
* @param rel_id The ID of the relation that produced 'block'.
+ * @param part_id The partition ID of 'block', if any. By default, a block
+ * blongs to the only partition (aka, no partition).
**/
void processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
- const relation_id rel_id);
+ const relation_id rel_id,
+ const partition_id part_id = 0);
/**
* @brief Fetch all work orders currently available in relational operator and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 87b8934..28ab388 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -177,8 +177,8 @@ class MockOperator: public RelationalOperator {
return true;
}
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
++num_calls_feedblock_;
MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index ce6015d..cc1009a 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -98,7 +98,8 @@ class AggregationOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index f0f42e3..9d2319a 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -114,8 +114,8 @@ class BuildHashOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index 9c3f357..7b69d9c 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -100,7 +100,8 @@ class DeleteOperator : public RelationalOperator {
return relation_.getID();
}
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
DCHECK(!relation_is_stored_);
relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 8829d1f..508cd03 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -190,8 +190,8 @@ class HashJoinOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
DCHECK(input_relation_id == probe_relation_.getID());
probe_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 951851d..f8eb080 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -141,7 +141,8 @@ class NestedLoopsJoinOperator : public RelationalOperator {
}
}
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
if (input_relation_id == left_input_relation_.getID()) {
left_relation_block_ids_.push_back(input_block_id);
} else if (input_relation_id == right_input_relation_.getID()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index a7f4177..fdea307 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -138,11 +138,11 @@ class RelationalOperator {
* @brief Receive input blocks for this RelationalOperator.
*
* @param input_block_id The ID of the input block.
- *
* @param relation_id The ID of the relation that produced this input_block.
+ * @param part_id The partition ID of 'input_block_id'.
**/
- virtual void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) {}
+ virtual void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) {}
/**
* @brief Signal the end of feeding of input blocks for this
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/SampleOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp
index ccf6595..e56201a 100644
--- a/relational_operators/SampleOperator.hpp
+++ b/relational_operators/SampleOperator.hpp
@@ -108,7 +108,8 @@ class SampleOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/SaveBlocksOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 27fd911..cd79733 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -83,7 +83,8 @@ class SaveBlocksOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
destination_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 989eaac..2aa3918 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -210,10 +210,9 @@ class SelectOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
if (input_relation_.hasPartitionScheme()) {
- const partition_id part_id =
- input_relation_.getPartitionScheme()->getPartitionForBlock(input_block_id);
input_relation_block_ids_in_partition_[part_id].push_back(input_block_id);
} else {
input_relation_block_ids_.push_back(input_block_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index aff7d8d..d2d9a2a 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -144,8 +144,8 @@ class SortMergeRunOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
if (started_) {
initializeInputRuns();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index a2ffb2b..25a1273 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -124,7 +124,8 @@ class SortRunGenerationOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
DCHECK(input_relation_id == input_relation_.getID());
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index 74fecec..7a46e6e 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -1602,7 +1602,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
// Feed blocks.
DVLOG(1) << "Feeding " << to_feed.size() << " blocks.";
for (const block_id block : to_feed) {
- merge_op_->feedInputBlock(block, input_table_->getID());
+ merge_op_->feedInputBlock(block, input_table_->getID(), 0 /* partition_id */);
}
// Remove fed blocks.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 19bb356..944998f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -789,7 +789,7 @@ void PartitionAwareInsertDestination::returnBlockInPartition(MutableBlockReferen
<< "invalidated one or more IndexSubBlocks.");
}
// Note that the block will only be sent if it's full (true).
- sendBlockFilledMessage(block->getID());
+ sendBlockFilledMessage(block->getID(), part_id);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/658b2924/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3487638..c3c40bd 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -216,13 +216,15 @@ class InsertDestination : public InsertDestinationInterface {
* scheduler.
*
* @param id The id of the StorageBlock to be pipelined.
+ * @param part_id The partition id of Block <id>, if any.
**/
- void sendBlockFilledMessage(const block_id id) const {
+ void sendBlockFilledMessage(const block_id id, const partition_id part_id = 0) const {
serialization::DataPipelineMessage proto;
proto.set_operator_index(relational_op_index_);
proto.set_block_id(id);
proto.set_relation_id(relation_.getID());
proto.set_query_id(query_id_);
+ proto.set_partition_id(part_id);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();