You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2017/06/14 21:08:36 UTC

incubator-quickstep git commit: Minor refactors in WorkOrderFactory.

Repository: incubator-quickstep
Updated Branches:
  refs/heads/master d0c55320f -> 0f549acb5


Minor refactors in WorkOrderFactory.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/0f549acb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/0f549acb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/0f549acb

Branch: refs/heads/master
Commit: 0f549acb54c56337a3d7e805e125e78aea48e013
Parents: d0c5532
Author: Zuyu Zhang <zu...@apache.org>
Authored: Wed Jun 14 15:59:48 2017 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Wed Jun 14 15:59:48 2017 -0500

----------------------------------------------------------------------
 relational_operators/WorkOrderFactory.cpp | 103 +++++++++++++------------
 1 file changed, 53 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/0f549acb/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index bb71ec4..4c1ffa9 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -61,6 +61,7 @@
 #include "tmb/id_typedefs.h"
 
 using std::move;
+using std::size_t;
 using std::vector;
 
 namespace quickstep {
@@ -83,12 +84,14 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       << "Attempted to create WorkOrder from an invalid proto description:\n"
       << proto.DebugString();
 
+  const size_t query_id = proto.query_id();
+
   switch (proto.work_order_type()) {
     case serialization::AGGREGATION: {
-      LOG(INFO) << "Creating AggregationWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating AggregationWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new AggregationWorkOrder(
-          proto.query_id(),
+          query_id,
           proto.GetExtension(serialization::AggregationWorkOrder::block_id),
           query_context->getAggregationState(
               proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)),
@@ -96,11 +99,11 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::AggregationWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::BUILD_AGGREGATION_EXISTENCE_MAP: {
-      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating BuildAggregationExistenceMapWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
 
       return new BuildAggregationExistenceMapWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::relation_id)),
           proto.GetExtension(serialization::BuildAggregationExistenceMapWorkOrder::build_block_id),
@@ -110,14 +113,14 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::BUILD_LIP_FILTER: {
-      LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating BuildLIPFilterWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
 
       const QueryContext::lip_deployment_id lip_deployment_index =
           proto.GetExtension(serialization::BuildLIPFilterWorkOrder::lip_deployment_index);
 
       return new BuildLIPFilterWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildLIPFilterWorkOrder::relation_id)),
           proto.GetExtension(serialization::BuildLIPFilterWorkOrder::build_block_id),
@@ -131,7 +134,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       const partition_id part_id =
           proto.GetExtension(serialization::BuildHashWorkOrder::partition_id);
 
-      LOG(INFO) << "Creating BuildHashWorkOrder (Partition " << part_id << ") for Query " << proto.query_id()
+      LOG(INFO) << "Creating BuildHashWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       vector<attribute_id> join_key_attributes;
       for (int i = 0; i < proto.ExtensionSize(serialization::BuildHashWorkOrder::join_key_attributes); ++i) {
@@ -140,7 +143,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
 
       return new BuildHashWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::BuildHashWorkOrder::relation_id)),
           move(join_key_attributes),
@@ -154,9 +157,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::DELETE: {
-      LOG(INFO) << "Creating DeleteWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating DeleteWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       return new DeleteWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::DeleteWorkOrder::relation_id)),
           proto.GetExtension(serialization::DeleteWorkOrder::block_id),
@@ -168,10 +171,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
-      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating DestroyAggregationStateWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new DestroyAggregationStateWorkOrder(
-          proto.query_id(),
+          query_id,
           proto.GetExtension(
               serialization::DestroyAggregationStateWorkOrder::aggr_state_index),
           query_context);
@@ -180,17 +183,17 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       const partition_id part_id =
           proto.GetExtension(serialization::DestroyHashWorkOrder::partition_id);
 
-      LOG(INFO) << "Creating DestroyHashWorkOrder (Partition " << part_id << ") for Query " << proto.query_id()
+      LOG(INFO) << "Creating DestroyHashWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new DestroyHashWorkOrder(
-          proto.query_id(),
+          query_id,
           proto.GetExtension(
               serialization::DestroyHashWorkOrder::join_hash_table_index),
           part_id,
           query_context);
     }
     case serialization::DROP_TABLE: {
-      LOG(INFO) << "Creating DropTableWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating DropTableWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       vector<block_id> blocks;
       for (int i = 0; i < proto.ExtensionSize(serialization::DropTableWorkOrder::block_ids); ++i) {
         blocks.push_back(
@@ -198,7 +201,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
 
       return new DropTableWorkOrder(
-          proto.query_id(),
+          query_id,
           move(blocks),
           storage_manager,
           proto.HasExtension(serialization::DropTableWorkOrder::relation_id)
@@ -207,12 +210,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           catalog_database);
     }
     case serialization::FINALIZE_AGGREGATION: {
-      LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating FinalizeAggregationWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       // TODO(quickstep-team): Handle inner-table partitioning in the distributed
       // setting.
       return new FinalizeAggregationWorkOrder(
-          proto.query_id(),
+          query_id,
           0uL /* partition_id */,
           query_context->getAggregationState(proto.GetExtension(
               serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
@@ -268,10 +271,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
 
       switch (hash_join_work_order_type) {
         case serialization::HashJoinWorkOrder::HASH_ANTI_JOIN: {
-          LOG(INFO) << "Creating HashAntiJoinWorkOrder (Partition " << part_id << ") for Query " << proto.query_id()
+          LOG(INFO) << "Creating HashAntiJoinWorkOrder (Partition " << part_id << ") for Query " << query_id
                     << " in Shiftboss " << shiftboss_index;
           return new HashAntiJoinWorkOrder(
-              proto.query_id(),
+              query_id,
               build_relation,
               probe_relation,
               move(join_key_attributes),
@@ -286,10 +289,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_INNER_JOIN: {
-          LOG(INFO) << "Creating HashInnerJoinWorkOrder (Partition " << part_id << ") for Query " << proto.query_id()
+          LOG(INFO) << "Creating HashInnerJoinWorkOrder (Partition " << part_id << ") for Query " << query_id
                     << " in Shiftboss " << shiftboss_index;
           return new HashInnerJoinWorkOrder(
-              proto.query_id(),
+              query_id,
               build_relation,
               probe_relation,
               move(join_key_attributes),
@@ -312,10 +315,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
                 proto.GetExtension(serialization::HashJoinWorkOrder::is_selection_on_build, i));
           }
 
-          LOG(INFO) << "Creating HashOuterJoinWorkOrder (Partition " << part_id << ") for Query " << proto.query_id()
+          LOG(INFO) << "Creating HashOuterJoinWorkOrder (Partition " << part_id << ") for Query " << query_id
                     << " in Shiftboss " << shiftboss_index;
           return new HashOuterJoinWorkOrder(
-              proto.query_id(),
+              query_id,
               build_relation,
               probe_relation,
               move(join_key_attributes),
@@ -330,10 +333,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               lip_filter_adaptive_prober);
         }
         case serialization::HashJoinWorkOrder::HASH_SEMI_JOIN: {
-          LOG(INFO) << "Creating HashSemiJoinWorkOrder (Partition " << part_id << ") for Query " << proto.query_id()
+          LOG(INFO) << "Creating HashSemiJoinWorkOrder (Partition " << part_id << ") for Query " << query_id
                     << " in Shiftboss " << shiftboss_index;
           return new HashSemiJoinWorkOrder(
-              proto.query_id(),
+              query_id,
               build_relation,
               probe_relation,
               move(join_key_attributes),
@@ -352,19 +355,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
     }
     case serialization::INSERT: {
-      LOG(INFO) << "Creating InsertWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating InsertWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       return new InsertWorkOrder(
-          proto.query_id(),
+          query_id,
           query_context->getInsertDestination(
               proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
           query_context->releaseTuple(
               proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
     }
     case serialization::NESTED_LOOP_JOIN: {
-      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new NestedLoopsJoinWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id)),
           catalog_database->getRelationSchemaById(
@@ -380,9 +383,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::SAMPLE: {
-      LOG(INFO) << "Creating SampleWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SampleWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       return new SampleWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SampleWorkOrder::relation_id)),
           proto.GetExtension(serialization::SampleWorkOrder::block_id),
@@ -393,16 +396,16 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::SAVE_BLOCKS: {
-      LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new SaveBlocksWorkOrder(
-          proto.query_id(),
+          query_id,
           proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
           proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
           storage_manager);
     }
     case serialization::SELECT: {
-      LOG(INFO) << "Creating SelectWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating SelectWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       const bool simple_projection =
           proto.GetExtension(serialization::SelectWorkOrder::simple_projection);
       vector<attribute_id> simple_selection;
@@ -412,7 +415,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
 
       return new SelectWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SelectWorkOrder::relation_id)),
           proto.GetExtension(serialization::SelectWorkOrder::block_id),
@@ -430,7 +433,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::SelectWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::SORT_MERGE_RUN: {
-      LOG(INFO) << "Creating SortMergeRunWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating SortMergeRunWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       vector<merge_run_operator::Run> runs;
       for (int i = 0; i < proto.ExtensionSize(serialization::SortMergeRunWorkOrder::runs); ++i) {
@@ -444,7 +447,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
 
       return new SortMergeRunWorkOrder(
-          proto.query_id(),
+          query_id,
           query_context->getSortConfig(
               proto.GetExtension(serialization::SortMergeRunWorkOrder::sort_config_index)),
           catalog_database->getRelationSchemaById(
@@ -460,10 +463,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::SORT_RUN_GENERATION: {
-      LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new SortRunGenerationWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::SortRunGenerationWorkOrder::relation_id)),
           proto.GetExtension(serialization::SortRunGenerationWorkOrder::block_id),
@@ -474,19 +477,19 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::TABLE_GENERATOR: {
-      LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating SortRunGenerationWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new TableGeneratorWorkOrder(
-          proto.query_id(),
+          query_id,
           query_context->getGeneratorFunctionHandle(
               proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)));
     }
     case serialization::TEXT_SCAN: {
-      LOG(INFO) << "Creating TextScanWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating TextScanWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       return new TextScanWorkOrder(
-          proto.query_id(),
+          query_id,
           proto.GetExtension(serialization::TextScanWorkOrder::filename),
           proto.GetExtension(serialization::TextScanWorkOrder::text_offset),
           proto.GetExtension(serialization::TextScanWorkOrder::text_segment_size),
@@ -497,14 +500,14 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           hdfs);
     }
     case serialization::UNION_ALL: {
-      LOG(INFO) << "Creating UnionAllWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating UnionAllWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       vector<attribute_id> select_attribute_id;
       for (int i = 0; i < proto.ExtensionSize(serialization::UnionAllWorkOrder::select_attribute_id); ++i) {
         select_attribute_id.push_back(
             proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i));
       }
       return new UnionAllWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::UnionAllWorkOrder::relation_id)),
           proto.GetExtension(serialization::UnionAllWorkOrder::block_id),
@@ -514,9 +517,9 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager);
     }
     case serialization::UPDATE: {
-      LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      LOG(INFO) << "Creating UpdateWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
       return new UpdateWorkOrder(
-          proto.query_id(),
+          query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::UpdateWorkOrder::relation_id)),
           proto.GetExtension(serialization::UpdateWorkOrder::block_id),
@@ -532,7 +535,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           bus);
     }
     case serialization::WINDOW_AGGREGATION: {
-      LOG(INFO) << "Creating WindowAggregationWorkOrder for Query " << proto.query_id()
+      LOG(INFO) << "Creating WindowAggregationWorkOrder for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       vector<block_id> blocks;
       for (int i = 0; i < proto.ExtensionSize(serialization::WindowAggregationWorkOrder::block_ids); ++i) {
@@ -541,7 +544,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
 
       return new WindowAggregationWorkOrder(
-          proto.query_id(),
+          query_id,
           query_context->getWindowAggregationState(
               proto.GetExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index)),
           move(blocks),