You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/01/17 19:33:05 UTC

incubator-quickstep git commit: Added partition_id in feedInputBlock.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/feedInputBlock-part-id [created] b553e0115


Added partition_id in feedInputBlock.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/b553e011
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/b553e011
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/b553e011

Branch: refs/heads/feedInputBlock-part-id
Commit: b553e0115fe4130fc54d9419d0a1e6ab3ccfbfc4
Parents: b0e5968
Author: Zuyu Zhang <zu...@apache.org>
Authored: Sun Jan 15 19:53:54 2017 -0800
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Mon Jan 16 09:34:24 2017 -0800

----------------------------------------------------------------------
 query_execution/PolicyEnforcerBase.cpp                       | 2 +-
 query_execution/QueryExecutionMessages.proto                 | 3 +++
 query_execution/QueryManagerBase.cpp                         | 5 +++--
 query_execution/QueryManagerBase.hpp                         | 5 ++++-
 query_execution/tests/QueryManagerSingleNode_unittest.cpp    | 4 ++--
 relational_operators/AggregationOperator.hpp                 | 3 ++-
 relational_operators/BuildHashOperator.hpp                   | 4 ++--
 relational_operators/DeleteOperator.hpp                      | 3 ++-
 relational_operators/HashJoinOperator.hpp                    | 4 ++--
 relational_operators/NestedLoopsJoinOperator.hpp             | 3 ++-
 relational_operators/RelationalOperator.hpp                  | 6 +++---
 relational_operators/SampleOperator.hpp                      | 3 ++-
 relational_operators/SaveBlocksOperator.hpp                  | 3 ++-
 relational_operators/SelectOperator.hpp                      | 5 ++---
 relational_operators/SortMergeRunOperator.hpp                | 4 ++--
 relational_operators/SortRunGenerationOperator.hpp           | 3 ++-
 relational_operators/tests/SortMergeRunOperator_unittest.cpp | 2 +-
 storage/InsertDestination.cpp                                | 2 +-
 storage/InsertDestination.hpp                                | 4 +++-
 19 files changed, 41 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/PolicyEnforcerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/PolicyEnforcerBase.cpp b/query_execution/PolicyEnforcerBase.cpp
index 1a2ab46..a26b84e 100644
--- a/query_execution/PolicyEnforcerBase.cpp
+++ b/query_execution/PolicyEnforcerBase.cpp
@@ -118,7 +118,7 @@ void PolicyEnforcerBase::processMessage(const TaggedMessage &tagged_message) {
 
       op_index = proto.operator_index();
       admitted_queries_[query_id]->processDataPipelineMessage(
-          op_index, proto.block_id(), proto.relation_id());
+          op_index, proto.block_id(), proto.relation_id(), proto.partition_id());
       break;
     }
     case kWorkOrderFeedbackMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/QueryExecutionMessages.proto
----------------------------------------------------------------------
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index 28b5ebd..115a9a3 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -65,6 +65,9 @@ message DataPipelineMessage {
   required fixed64 block_id = 2;
   required int32 relation_id = 3;
   required uint64 query_id = 4;
+
+  // Used by PartitionAwareInsertDestination.
+  optional uint64 partition_id = 5 [default = 0];
 }
 
 // Distributed version related messages.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/QueryManagerBase.cpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 8e37da8..5f8c6a3 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -192,13 +192,14 @@ void QueryManagerBase::processOperator(const dag_node_index index,
 
 void QueryManagerBase::processDataPipelineMessage(const dag_node_index op_index,
                                                   const block_id block,
-                                                  const relation_id rel_id) {
+                                                  const relation_id rel_id,
+                                                  const partition_id part_id) {
   for (const dag_node_index consumer_index :
        output_consumers_[op_index]) {
     // Feed the streamed block to the consumer. Note that 'output_consumers_'
     // only contain those dependents of operator with index = op_index which are
     // eligible to receive streamed input.
-    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id);
+    query_dag_->getNodePayloadMutable(consumer_index)->feedInputBlock(block, rel_id, part_id);
     // Because of the streamed input just fed, check if there are any new
     // WorkOrders available and if so, fetch them.
     fetchNormalWorkOrders(consumer_index);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/QueryManagerBase.hpp
----------------------------------------------------------------------
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index ddb76d5..d0bb0ea 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -109,10 +109,13 @@ class QueryManagerBase {
    *        for the pipelining block.
    * @param block The block id.
    * @param rel_id The ID of the relation that produced 'block'.
+   * @param part_id The partition ID of 'block', if any. By default, a block
+   *        blongs to the only partition (aka, no partition).
    **/
   void processDataPipelineMessage(const dag_node_index op_index,
                                   const block_id block,
-                                  const relation_id rel_id);
+                                  const relation_id rel_id,
+                                  const partition_id part_id = 0);
 
   /**
    * @brief Fetch all work orders currently available in relational operator and

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/query_execution/tests/QueryManagerSingleNode_unittest.cpp
----------------------------------------------------------------------
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 87b8934..28ab388 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -177,8 +177,8 @@ class MockOperator: public RelationalOperator {
     return true;
   }
 
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     ++num_calls_feedblock_;
     MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/AggregationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index ce6015d..cc1009a 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -98,7 +98,8 @@ class AggregationOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     input_relation_block_ids_.push_back(input_block_id);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/BuildHashOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index f0f42e3..9d2319a 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -114,8 +114,8 @@ class BuildHashOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     input_relation_block_ids_.push_back(input_block_id);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index 9c3f357..7b69d9c 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -100,7 +100,8 @@ class DeleteOperator : public RelationalOperator {
     return relation_.getID();
   }
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     DCHECK(!relation_is_stored_);
     relation_block_ids_.push_back(input_block_id);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 8829d1f..508cd03 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -190,8 +190,8 @@ class HashJoinOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     DCHECK(input_relation_id == probe_relation_.getID());
     probe_relation_block_ids_.push_back(input_block_id);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/NestedLoopsJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 951851d..f8eb080 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -141,7 +141,8 @@ class NestedLoopsJoinOperator : public RelationalOperator {
     }
   }
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     if (input_relation_id == left_input_relation_.getID()) {
       left_relation_block_ids_.push_back(input_block_id);
     } else if (input_relation_id == right_input_relation_.getID()) {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/RelationalOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index a7f4177..fdea307 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -138,11 +138,11 @@ class RelationalOperator {
    * @brief Receive input blocks for this RelationalOperator.
    *
    * @param input_block_id The ID of the input block.
-   *
    * @param relation_id The ID of the relation that produced this input_block.
+   * @param part_id The partition ID of 'input_block_id'.
    **/
-  virtual void feedInputBlock(const block_id input_block_id,
-                              const relation_id input_relation_id) {}
+  virtual void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                              const partition_id part_id) {}
 
   /**
    * @brief Signal the end of feeding of input blocks for this

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SampleOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SampleOperator.hpp b/relational_operators/SampleOperator.hpp
index ccf6595..e56201a 100644
--- a/relational_operators/SampleOperator.hpp
+++ b/relational_operators/SampleOperator.hpp
@@ -108,7 +108,8 @@ class SampleOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     input_relation_block_ids_.push_back(input_block_id);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SaveBlocksOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 27fd911..cd79733 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -83,7 +83,8 @@ class SaveBlocksOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     destination_block_ids_.push_back(input_block_id);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SelectOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index 5846eda..c8e6162 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -210,10 +210,9 @@ class SelectOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     if (input_relation_.hasPartitionScheme()) {
-      const partition_id part_id =
-          input_relation_.getPartitionScheme().getPartitionForBlock(input_block_id);
       input_relation_block_ids_in_partition_[part_id].push_back(input_block_id);
     } else {
       input_relation_block_ids_.push_back(input_block_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index aff7d8d..d2d9a2a 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -144,8 +144,8 @@ class SortMergeRunOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id,
-                      const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     input_relation_block_ids_.push_back(input_block_id);
     if (started_) {
       initializeInputRuns();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index a2ffb2b..25a1273 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -124,7 +124,8 @@ class SortRunGenerationOperator : public RelationalOperator {
 
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
-  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
+  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
+                      const partition_id part_id) override {
     DCHECK(input_relation_id == input_relation_.getID());
     input_relation_block_ids_.push_back(input_block_id);
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index 74fecec..7a46e6e 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -1602,7 +1602,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
     // Feed blocks.
     DVLOG(1) << "Feeding " << to_feed.size() << " blocks.";
     for (const block_id block : to_feed) {
-      merge_op_->feedInputBlock(block, input_table_->getID());
+      merge_op_->feedInputBlock(block, input_table_->getID(), 0 /* partition_id */);
     }
 
     // Remove fed blocks.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 19bb356..944998f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -789,7 +789,7 @@ void PartitionAwareInsertDestination::returnBlockInPartition(MutableBlockReferen
                                                        << "invalidated one or more IndexSubBlocks.");
   }
   // Note that the block will only be sent if it's full (true).
-  sendBlockFilledMessage(block->getID());
+  sendBlockFilledMessage(block->getID(), part_id);
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/b553e011/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 3487638..c3c40bd 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -216,13 +216,15 @@ class InsertDestination : public InsertDestinationInterface {
    *        scheduler.
    *
    * @param id The id of the StorageBlock to be pipelined.
+   * @param part_id The partition id of Block <id>, if any.
    **/
-  void sendBlockFilledMessage(const block_id id) const {
+  void sendBlockFilledMessage(const block_id id, const partition_id part_id = 0) const {
     serialization::DataPipelineMessage proto;
     proto.set_operator_index(relational_op_index_);
     proto.set_block_id(id);
     proto.set_relation_id(relation_.getID());
     proto.set_query_id(query_id_);
+    proto.set_partition_id(part_id);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();