You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/01/26 02:34:37 UTC
[16/22] incubator-quickstep git commit: Added partition_id in
feedInputBlock.
Added partition_id in feedInputBlock.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/ee3b7f0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/ee3b7f0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/ee3b7f0e
Branch: refs/heads/exact-filter
Commit: ee3b7f0e976523e0421d3003cd7e8cb972fccbbe
Parents: 6c10e99
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Jan 15 19:53:54 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jan 18 12:45:15 2017 -0800
----------------------------------------------------------------------
query_execution/PolicyEnforcerBase.cpp | 2 +-
query_execution/QueryExecutionMessages.proto | 3 +++
query_execution/QueryManagerBase.cpp | 5 +++--
query_execution/QueryManagerBase.hpp | 5 ++++-
query_execution/tests/QueryManagerSingleNode_unittest.cpp | 4 ++--
relational_operators/AggregationOperator.hpp | 3 ++-
relational_operators/BuildHashOperator.hpp | 4 ++--
relational_operators/DeleteOperator.hpp | 3 ++-
relational_operators/HashJoinOperator.hpp | 4 ++--
relational_operators/NestedLoopsJoinOperator.hpp | 3 ++-
relational_operators/RelationalOperator.hpp | 6 +++---
relational_operators/SampleOperator.hpp | 3 ++-
relational_operators/SaveBlocksOperator.hpp | 3 ++-
relational_operators/SelectOperator.hpp | 5 ++---
relational_operators/SortMergeRunOperator.hpp | 4 ++--
relational_operators/SortRunGenerationOperator.hpp | 3 ++-
relational_operators/tests/SortMergeRunOperator_unittest.cpp | 2 +-
storage/InsertDestination.cpp | 2 +-
storage/InsertDestination.hpp | 4 +++-
19 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 1a2ab46..a26b84e 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -118,7 +118,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
op_index = proto.operator_index();
admitted_queries_[query_id]->processDataPipelineMessage(
- op_index, proto.block_id(), proto.relation_id());
+ op_index, proto.block_id(), proto.relation_id(), proto.partition_id());
break;
}
case kWorkOrderFeedbackMessage: {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 28b5ebd..115a9a3 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -65,6 +65,9 @@ message DataPipelineMessage {
required fixed64 block_id = 2;
required int32 relation_id = 3;
required uint64 query_id = 4;
+
+ // Used by PartitionAwareInsertDestination.
+ optional uint64 partition_id = 5 [default = 0];
}
// Distributed version related messages.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 8e37da8..5f8c6a3 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -192,13 +192,14 @@ void QueryManagerBase::processOperator(const dag_node_index index,
void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
- const relation_id rel_id) {
+ const relation_id rel_id,
+ const partition_id part_id) {
for (const dag_node_index consumer_index :
output_consumers_[op_index]) {
// Feed the streamed block to the consumer. Note that 'output_consumers_'
// only contain those dependents of operator with index = op_index which are
// eligible to receive streamed input.
- query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
+ query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id);
// Because of the streamed input just fed, check if there are any new
// WorkOrders available and if so, fetch them.
fetchNormalWorkOrders(consumer_index);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index ddb76d5..d0bb0ea 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -109,10 +109,13 @@ class QueryManagerBase {
* for the pipelining block.
* @param block The block id.
* @param rel_id The ID of the relation that produced 'block'.
+ * @param part_id The partition ID of 'block', if any. By default, a block
+ * blongs to the only partition (aka, no partition).
**/
void processDataPipelineMessage(const dag_node_index op_index,
const block_id block,
- const relation_id rel_id);
+ const relation_id rel_id,
+ const partition_id part_id = 0);
/**
* @brief Fetch all work orders currently available in relational operator and
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 87b8934..28ab388 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -177,8 +177,8 @@ class MockOperator: public RelationalOperator {
return true;
}
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
++num_calls_feedblock_;
MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index c4e887d..2bd69f3 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -98,7 +98,8 @@ class AggregationOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index 074c1e1..dec121c 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -114,8 +114,8 @@ class BuildHashOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index 9c3f357..7b69d9c 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -100,7 +100,8 @@ class DeleteOperator : public RelationalOperator {
return relation_.getID();
}
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
DCHECK(!relation_is_stored_);
relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 190f9d2..98c87bf 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -190,8 +190,8 @@ class HashJoinOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
DCHECK(input_relation_id == probe_relation_.getID());
probe_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 951851d..f8eb080 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -141,7 +141,8 @@ class NestedLoopsJoinOperator : public RelationalOperator {
}
}
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
if (input_relation_id == left_input_relation_.getID()) {
left_relation_block_ids_.push_back(input_block_id);
} else if (input_relation_id == right_input_relation_.getID()) {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index a7f4177..fdea307 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -138,11 +138,11 @@ class RelationalOperator {
* @brief Receive input blocks for this RelationalOperator.
*
* @param input_block_id The ID of the input block.
- *
* @param relation_id The ID of the relation that produced this input_block.
+ * @param part_id The partition ID of 'input_block_id'.
**/
- virtual void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) {}
+ virtual void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) {}
/**
* @brief Signal the end of feeding of input blocks for this
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/SampleOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp
index ccf6595..e56201a 100644
--- a/relational_operators/SampleOperator.hpp
+++ b/relational_operators/SampleOperator.hpp
@@ -108,7 +108,8 @@ class SampleOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/SaveBlocksOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 27fd911..cd79733 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -83,7 +83,8 @@ class SaveBlocksOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
destination_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index e58ff81..79ab37f 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -204,10 +204,9 @@ class SelectOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
if (input_relation_.hasPartitionScheme()) {
- const partition_id part_id =
- input_relation_.getPartitionScheme()->getPartitionForBlock(input_block_id);
input_relation_block_ids_in_partition_[part_id].push_back(input_block_id);
} else {
input_relation_block_ids_.push_back(input_block_id);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index aff7d8d..d2d9a2a 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -144,8 +144,8 @@ class SortMergeRunOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id,
- const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
input_relation_block_ids_.push_back(input_block_id);
if (started_) {
initializeInputRuns();
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index a2ffb2b..25a1273 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -124,7 +124,8 @@ class SortRunGenerationOperator : public RelationalOperator {
bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
- void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+ void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+ const partition_id part_id) override {
DCHECK(input_relation_id == input_relation_.getID());
input_relation_block_ids_.push_back(input_block_id);
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index 74fecec..7a46e6e 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -1602,7 +1602,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
// Feed blocks.
DVLOG(1) << "Feeding " << to_feed.size() << " blocks.";
for (const block_id block : to_feed) {
- merge_op_->feedInputBlock(block, input_table_->getID());
+ merge_op_->feedInputBlock(block, input_table_->getID(), 0 /* partition_id */);
}
// Remove fed blocks.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 19bb356..944998f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -789,7 +789,7 @@ void PartitionAwareInsertDestination::returnBlockInPartition(MutableBlockReferen
<< "invalidated one or more IndexSubBlocks.");
}
// Note that the block will only be sent if it's full (true).
- sendBlockFilledMessage(block->getID());
+ sendBlockFilledMessage(block->getID(), part_id);
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/ee3b7f0e/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3487638..c3c40bd 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -216,13 +216,15 @@ class InsertDestination : public InsertDestinationInterface {
* scheduler.
*
* @param id The id of the StorageBlock to be pipelined.
+ * @param part_id The partition id of Block <id>, if any.
**/
- void sendBlockFilledMessage(const block_id id) const {
+ void sendBlockFilledMessage(const block_id id, const partition_id part_id = 0) const {
serialization::DataPipelineMessage proto;
proto.set_operator_index(relational_op_index_);
proto.set_block_id(id);
proto.set_relation_id(relation_.getID());
proto.set_query_id(query_id_);
+ proto.set_partition_id(part_id);
// NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
const std::size_t proto_length = proto.ByteSize();