You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/06/09 05:41:04 UTC

[41/50] [abbrv] incubator-quickstep git commit: Added Query ID to Relational operators and WorkOrders.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index f92affe..f54e925 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -88,6 +88,7 @@ class SortMergeRunOperator : public RelationalOperator {
    *              \c top_k is 0.
    * @param input_relation_is_stored Boolean to indicate is input relation is
    *                                 stored or streamed.
+   * @param query_id The ID of the query to which this operator belongs.
    **/
   SortMergeRunOperator(const CatalogRelation &input_relation,
                        const CatalogRelation &output_relation,
@@ -97,8 +98,10 @@ class SortMergeRunOperator : public RelationalOperator {
                        const QueryContext::sort_config_id sort_config_index,
                        const std::size_t merge_factor,
                        const std::size_t top_k,
-                       const bool input_relation_is_stored)
-      : input_relation_(input_relation),
+                       const bool input_relation_is_stored,
+                       const std::size_t query_id)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         sort_config_index_(sort_config_index),
@@ -216,6 +219,7 @@ class SortMergeRunWorkOrder : public WorkOrder {
    * @param input_runs Input runs to merge.
    * @param top_k If non-zero will merge only \c top_k tuples.
    * @param merge_level Merge level in the merge tree.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param output_destination The InsertDestination to create new blocks.
    * @param storage_manager The StorageManager to use.
    * @param operator_index Merge-run operator index to send feedback messages
@@ -229,12 +233,14 @@ class SortMergeRunWorkOrder : public WorkOrder {
       std::vector<merge_run_operator::Run> &&input_runs,
       const std::size_t top_k,
       const std::size_t merge_level,
+      const std::size_t query_id,
       InsertDestination *output_destination,
       StorageManager *storage_manager,
       const std::size_t operator_index,
       const tmb::client_id scheduler_client_id,
       MessageBus *bus)
-      : sort_config_(sort_config),
+      : WorkOrder(query_id),
+        sort_config_(sort_config),
         run_relation_(run_relation),
         input_runs_(std::move(input_runs)),
         top_k_(top_k),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/SortRunGenerationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.cpp b/relational_operators/SortRunGenerationOperator.cpp
index 9bb3f51..e352f9e 100644
--- a/relational_operators/SortRunGenerationOperator.cpp
+++ b/relational_operators/SortRunGenerationOperator.cpp
@@ -54,6 +54,7 @@ bool SortRunGenerationOperator::getAllWorkOrders(
             new SortRunGenerationWorkOrder(input_relation_,
                                            input_block_id,
                                            sort_config,
+                                           query_id_,
                                            output_destination,
                                            storage_manager),
             op_index_);
@@ -69,6 +70,7 @@ bool SortRunGenerationOperator::getAllWorkOrders(
               input_relation_,
               input_relation_block_ids_[num_workorders_generated_],
               sort_config,
+              query_id_,
               output_destination,
               storage_manager),
           op_index_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index 04290a9..3da9813 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -83,13 +83,16 @@ class SortRunGenerationOperator : public RelationalOperator {
    * @param input_relation_is_stored Does the input relation contain the blocks
    *                                 to sort. If \c false, the blocks are
    *                                 streamed.
+   * @param query_id The ID of the query to which this operator belongs.
    **/
   SortRunGenerationOperator(const CatalogRelation &input_relation,
                             const CatalogRelation &output_relation,
                             const QueryContext::insert_destination_id output_destination_index,
                             const QueryContext::sort_config_id sort_config_index,
-                            bool input_relation_is_stored)
-      : input_relation_(input_relation),
+                            bool input_relation_is_stored,
+                            const std::size_t query_id)
+      : RelationalOperator(query_id),
+        input_relation_(input_relation),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         sort_config_index_(sort_config_index),
@@ -152,6 +155,7 @@ class SortRunGenerationWorkOrder : public WorkOrder {
    * @param input_block_id The block id.
    * @param sort_config The Sort configuration specifying ORDER BY, ordering,
    *        and null ordering.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param output_destination The InsertDestination to store the sorted blocks
    *        of runs.
    * @param storage_manager The StorageManager to use.
@@ -159,9 +163,11 @@ class SortRunGenerationWorkOrder : public WorkOrder {
   SortRunGenerationWorkOrder(const CatalogRelationSchema &input_relation,
                              const block_id input_block_id,
                              const SortConfiguration &sort_config,
+                             const std::size_t query_id,
                              InsertDestination *output_destination,
                              StorageManager *storage_manager)
-      : input_relation_(input_relation),
+      : WorkOrder(query_id),
+        input_relation_(input_relation),
         input_block_id_(input_block_id),
         sort_config_(sort_config),
         output_destination_(DCHECK_NOTNULL(output_destination)),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/TableGeneratorOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index 886d05f..fb1f743 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -42,8 +42,11 @@ bool TableGeneratorOperator::getAllWorkOrders(
     // Currently the generator function is not abstracted to be parallelizable,
     // so just produce one work order.
     container->addNormalWorkOrder(
-        new TableGeneratorWorkOrder(query_context->getGeneratorFunctionHandle(generator_function_index_),
-                                    query_context->getInsertDestination(output_destination_index_)),
+        new TableGeneratorWorkOrder(
+            query_context->getGeneratorFunctionHandle(
+                generator_function_index_),
+            query_id_,
+            query_context->getInsertDestination(output_destination_index_)),
         op_index_);
     started_ = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index a26b227..bfc70c5 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -60,12 +60,14 @@ class TableGeneratorOperator : public RelationalOperator {
    *        QueryContext to insert the generated output.
    * @param generator_function_index The index of the GeneratorFunctionHandle in
    *        the QueryContext.
-   *
+   * @param query_id The ID of the query to which this operator belongs.
    **/
   TableGeneratorOperator(const CatalogRelation &output_relation,
                          const QueryContext::insert_destination_id output_destination_index,
-                         const QueryContext::generator_function_id generator_function_index)
-      : output_relation_(output_relation),
+                         const QueryContext::generator_function_id generator_function_index,
+                         const std::size_t query_id)
+      : RelationalOperator(query_id),
+        output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         generator_function_index_(generator_function_index),
         started_(false) {
@@ -112,12 +114,15 @@ class TableGeneratorWorkOrder : public WorkOrder {
    * @brief Constructor.
    *
    * @param generator_function The GeneratorFunctionHandle to use.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param output_destination The InsertDestination to insert the generated
    *        output.
    **/
   TableGeneratorWorkOrder(const GeneratorFunctionHandle &function_handle,
+                          const std::size_t query_id,
                           InsertDestination *output_destination)
-      : function_handle_(function_handle),
+      : WorkOrder(query_id),
+        function_handle_(function_handle),
         output_destination_(DCHECK_NOTNULL(output_destination)) {}
 
   ~TableGeneratorWorkOrder() override {}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 5ede6f7..8db5ef1 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -167,6 +167,7 @@ bool TextScanOperator::getAllWorkOrders(
           container->addNormalWorkOrder(
               new TextSplitWorkOrder(file,
                                      process_escape_sequences_,
+                                     query_id_,
                                      storage_manager,
                                      op_index_,
                                      scheduler_client_id,
@@ -185,6 +186,7 @@ bool TextScanOperator::getAllWorkOrders(
                                     blob_work.size,
                                     field_terminator_,
                                     process_escape_sequences_,
+                                    query_id_,
                                     output_destination,
                                     storage_manager),
               op_index_);
@@ -204,6 +206,7 @@ bool TextScanOperator::getAllWorkOrders(
             new TextScanWorkOrder(file,
                                   field_terminator_,
                                   process_escape_sequences_,
+                                  query_id_,
                                   output_destination,
                                   storage_manager),
             op_index_);
@@ -235,9 +238,11 @@ void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &
 TextScanWorkOrder::TextScanWorkOrder(const std::string &filename,
                                      const char field_terminator,
                                      const bool process_escape_sequences,
+                                     const std::size_t query_id,
                                      InsertDestination *output_destination,
                                      StorageManager *storage_manager)
-    : is_file_(true),
+    : WorkOrder(query_id),
+      is_file_(true),
       filename_(filename),
       field_terminator_(field_terminator),
       text_blob_(0),
@@ -253,9 +258,11 @@ TextScanWorkOrder::TextScanWorkOrder(const block_id text_blob,
                                      const std::size_t text_size,
                                      const char field_terminator,
                                      const bool process_escape_sequences,
+                                     const std::size_t query_id,
                                      InsertDestination *output_destination,
                                      StorageManager *storage_manager)
-    : is_file_(false),
+    : WorkOrder(query_id),
+      is_file_(false),
       field_terminator_(field_terminator),
       text_blob_(text_blob),
       text_size_(text_size),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index a2d4ced..1d0c04f 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -134,14 +134,17 @@ class TextScanOperator : public RelationalOperator {
    * @param output_relation The output relation.
    * @param output_destination_index The index of the InsertDestination in the
    *        QueryContext to insert tuples.
+   * @param query_id The ID of the query to which this operator belongs.
    **/
   TextScanOperator(const std::string &file_pattern,
                    const char field_terminator,
                    const bool process_escape_sequences,
                    const bool parallelize_load,
                    const CatalogRelation &output_relation,
-                   const QueryContext::insert_destination_id output_destination_index)
-      : file_pattern_(file_pattern),
+                   const QueryContext::insert_destination_id output_destination_index,
+                   const std::size_t query_id)
+      : RelationalOperator(query_id),
+        file_pattern_(file_pattern),
         field_terminator_(field_terminator),
         process_escape_sequences_(process_escape_sequences),
         parallelize_load_(parallelize_load),
@@ -202,6 +205,7 @@ class TextScanWorkOrder : public WorkOrder {
    *        the text file.
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param output_destination The InsertDestination to insert tuples.
    * @param storage_manager The StorageManager to use.
    **/
@@ -209,6 +213,7 @@ class TextScanWorkOrder : public WorkOrder {
       const std::string &filename,
       const char field_terminator,
       const bool process_escape_sequences,
+      const std::size_t query_id,
       InsertDestination *output_destination,
       StorageManager *storage_manager);
 
@@ -221,6 +226,7 @@ class TextScanWorkOrder : public WorkOrder {
    *        the text file.
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param output_destination The InsertDestination to write the read tuples.
    * @param storage_manager The StorageManager to use.
    */
@@ -229,6 +235,7 @@ class TextScanWorkOrder : public WorkOrder {
       const std::size_t text_size,
       const char field_terminator,
       const bool process_escape_sequences,
+      const std::size_t query_id,
       InsertDestination *output_destination,
       StorageManager *storage_manager);
 
@@ -318,6 +325,7 @@ class TextSplitWorkOrder : public WorkOrder {
    * @param filename File to split into row-aligned blobs.
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param storage_manager The StorageManager to use.
    * @param operator_index Operator index of the current operator. This is used
    *                       to send new-work available message to Foreman.
@@ -326,11 +334,13 @@ class TextSplitWorkOrder : public WorkOrder {
    */
   TextSplitWorkOrder(const std::string &filename,
                      const bool process_escape_sequences,
+                     const std::size_t query_id,
                      StorageManager *storage_manager,
                      const std::size_t operator_index,
                      const tmb::client_id scheduler_client_id,
                      MessageBus *bus)
-      : filename_(filename),
+      : WorkOrder(query_id),
+        filename_(filename),
         process_escape_sequences_(process_escape_sequences),
         storage_manager_(DCHECK_NOTNULL(storage_manager)),
         operator_index_(operator_index),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 7585db1..b331a9c 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -57,6 +57,7 @@ bool UpdateOperator::getAllWorkOrders(
                               input_block_id,
                               query_context->getPredicate(predicate_index_),
                               query_context->getUpdateGroup(update_group_index_),
+                              query_id_,
                               query_context->getInsertDestination(relocation_destination_index_),
                               storage_manager,
                               op_index_,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index 78f8fe0..ba2d6cf 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -72,6 +72,7 @@ class UpdateOperator : public RelationalOperator {
    * @param update_group_index The index of a update group (the map of
    *        attribute_ids to Scalars) which should be evaluated to get the new
    *        value for the corresponding attribute.
+   * @param query_id The ID of the query to which this operator belongs.
    *
    * @warning The constructed InsertDestination should belong to relation, but
    *          must NOT contain any pre-existing blocks.
@@ -79,8 +80,10 @@ class UpdateOperator : public RelationalOperator {
   UpdateOperator(const CatalogRelation &relation,
                  const QueryContext::insert_destination_id relocation_destination_index,
                  const QueryContext::predicate_id predicate_index,
-                 const QueryContext::update_group_id update_group_index)
-      : relation_(relation),
+                 const QueryContext::update_group_id update_group_index,
+                 const std::size_t query_id)
+      : RelationalOperator(query_id),
+        relation_(relation),
         relocation_destination_index_(relocation_destination_index),
         predicate_index_(predicate_index),
         update_group_index_(update_group_index),
@@ -130,6 +133,7 @@ class UpdateWorkOrder : public WorkOrder {
    * @param assignments The assignments (the map of attribute_ids to Scalars)
    *        which should be evaluated to get the new value for the corresponding
    *        attribute.
+   * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param input_block_id The block id.
    * @param relocation_destination The InsertDestination to relocate tuples
    *        which can not be updated in-place.
@@ -143,12 +147,14 @@ class UpdateWorkOrder : public WorkOrder {
                   const block_id input_block_id,
                   const Predicate *predicate,
                   const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>> &assignments,
+                  const std::size_t query_id,
                   InsertDestination *relocation_destination,
                   StorageManager *storage_manager,
                   const std::size_t update_operator_index,
                   const tmb::client_id scheduler_client_id,
                   MessageBus *bus)
-      : relation_(relation),
+      : WorkOrder(query_id),
+        relation_(relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
         assignments_(assignments),

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 42cec2a..059865d 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -286,8 +286,15 @@ class WorkOrder {
   }
 
  protected:
-  WorkOrder() {}
+  /**
+   * @brief Constructor.
+   *
+   * @param query_id The ID of the query to which this WorkOrder belongs.
+   **/
+  explicit WorkOrder(const std::size_t query_id)
+      : query_id_(query_id) {}
 
+  const std::size_t query_id_;
   // A vector of preferred NUMA node IDs where this workorder should be executed.
   // These node IDs typically indicate the NUMA node IDs of the input(s) of the
   // workorder. Derived classes should ensure that there are no duplicate entries

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 5d0619a..fd731f7 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -45,6 +45,7 @@ enum WorkOrderType {
 
 message WorkOrder {
   required WorkOrderType work_order_type = 1;
+  required uint64 query_id = 2;
 
   // The convention for extension numbering is that extensions for a particular
   // WorkOrderID should begin from (operator_type + 1) * 16.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 4157d0f..fdd694f 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -76,6 +76,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       LOG(INFO) << "Creating AggregationWorkOrder";
       return new AggregationWorkOrder(
           proto.GetExtension(serialization::AggregationWorkOrder::block_id),
+          proto.query_id(),
           query_context->getAggregationState(
               proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)));
     }
@@ -93,6 +94,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           move(join_key_attributes),
           proto.GetExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable),
           proto.GetExtension(serialization::BuildHashWorkOrder::block_id),
+          proto.query_id(),
           query_context->getJoinHashTable(
               proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index)),
           storage_manager);
@@ -108,12 +110,15 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           storage_manager,
           proto.GetExtension(serialization::DeleteWorkOrder::operator_index),
           shiftboss_client_id,
+          proto.query_id(),
           bus);
     }
     case serialization::DESTROY_HASH: {
       LOG(INFO) << "Creating DestroyHashWorkOrder";
       return new DestroyHashWorkOrder(
-          proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index),
+          proto.GetExtension(
+              serialization::DestroyHashWorkOrder::join_hash_table_index),
+          proto.query_id(),
           query_context);
     }
     case serialization::DROP_TABLE: {
@@ -125,6 +130,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       }
 
       return new DropTableWorkOrder(
+          proto.query_id(),
           move(blocks),
           storage_manager,
           proto.HasExtension(serialization::DropTableWorkOrder::relation_id)
@@ -135,10 +141,12 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
     case serialization::FINALIZE_AGGREGATION: {
       LOG(INFO) << "Creating FinalizeAggregationWorkOrder";
       return new FinalizeAggregationWorkOrder(
-          query_context->releaseAggregationState(
-              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
+          proto.query_id(),
+          query_context->releaseAggregationState(proto.GetExtension(
+              serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
           query_context->getInsertDestination(
-              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::
+                                     insert_destination_index)));
     }
     case serialization::HASH_JOIN: {
       const auto hash_join_work_order_type =
@@ -193,6 +201,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               residual_predicate,
               selection,
               hash_table,
+              proto.query_id(),
               output_destination,
               storage_manager);
         }
@@ -207,6 +216,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               residual_predicate,
               selection,
               hash_table,
+              proto.query_id(),
               output_destination,
               storage_manager);
         }
@@ -229,6 +239,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               selection,
               move(is_selection_on_build),
               hash_table,
+              proto.query_id(),
               output_destination,
               storage_manager);
         }
@@ -243,6 +254,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               residual_predicate,
               selection,
               hash_table,
+              proto.query_id(),
               output_destination,
               storage_manager);
         }
@@ -253,6 +265,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
     case serialization::INSERT: {
       LOG(INFO) << "Creating InsertWorkOrder";
       return new InsertWorkOrder(
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
           query_context->releaseTuple(
@@ -271,6 +284,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index)),
           query_context->getScalarGroup(
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index)),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index)),
           storage_manager);
@@ -283,6 +297,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::SampleWorkOrder::block_id),
           proto.GetExtension(serialization::SampleWorkOrder::is_block_sample),
           proto.GetExtension(serialization::SampleWorkOrder::percentage),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SampleWorkOrder::insert_destination_index)),
           storage_manager);
@@ -292,6 +307,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       return new SaveBlocksWorkOrder(
           proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
           proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
+          proto.query_id(),
           storage_manager);
     }
     case serialization::SELECT: {
@@ -315,6 +331,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           simple_projection ? nullptr
                             : &query_context->getScalarGroup(
                                   proto.GetExtension(serialization::SelectWorkOrder::selection_index)),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)),
           storage_manager);
@@ -340,6 +357,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           move(runs),
           proto.GetExtension(serialization::SortMergeRunWorkOrder::top_k),
           proto.GetExtension(serialization::SortMergeRunWorkOrder::merge_level),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index)),
           storage_manager,
@@ -355,6 +373,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
           proto.GetExtension(serialization::SortRunGenerationWorkOrder::block_id),
           query_context->getSortConfig(
               proto.GetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index)),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index)),
           storage_manager);
@@ -364,6 +383,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       return new TableGeneratorWorkOrder(
           query_context->getGeneratorFunctionHandle(
               proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)));
     }
@@ -374,6 +394,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
             proto.GetExtension(serialization::TextScanWorkOrder::filename),
             proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
             proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+            proto.query_id(),
             query_context->getInsertDestination(
                 proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
             storage_manager);
@@ -386,6 +407,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
             text_blob_proto.size(),
             proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
             proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+            proto.query_id(),
             query_context->getInsertDestination(
                 proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
             storage_manager);
@@ -396,6 +418,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
       return new TextSplitWorkOrder(
           proto.GetExtension(serialization::TextSplitWorkOrder::filename),
           proto.GetExtension(serialization::TextSplitWorkOrder::process_escape_sequences),
+          proto.query_id(),
           storage_manager,
           proto.GetExtension(serialization::TextSplitWorkOrder::operator_index),
           shiftboss_client_id,
@@ -411,6 +434,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::UpdateWorkOrder::predicate_index)),
           query_context->getUpdateGroup(
               proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)),
+          proto.query_id(),
           query_context->getInsertDestination(
               proto.GetExtension(serialization::UpdateWorkOrder::insert_destination_index)),
           storage_manager,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index f2207c2..ace7951 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -270,7 +270,7 @@ class AggregationOperatorTest : public ::testing::Test {
     aggr_state_proto->set_estimated_num_entries(estimated_entries);
 
     // Create Operators.
-    op_.reset(new AggregationOperator(*table_, true, aggr_state_index));
+    op_.reset(new AggregationOperator(*table_, true, aggr_state_index, 0));
 
     // Setup the InsertDestination proto in the query context proto.
     const QueryContext::insert_destination_id insert_destination_index =
@@ -281,8 +281,10 @@ class AggregationOperatorTest : public ::testing::Test {
     insert_destination_proto->set_relation_id(result_table_->getID());
     insert_destination_proto->set_relational_op_index(kOpIndex);
 
-    finalize_op_.reset(
-        new FinalizeAggregationOperator(aggr_state_index, *result_table_, insert_destination_index));
+    finalize_op_.reset(new FinalizeAggregationOperator(aggr_state_index,
+                                                       *result_table_,
+                                                       insert_destination_index,
+                                                       0 /* dummy query ID */));
 
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,
@@ -352,7 +354,7 @@ class AggregationOperatorTest : public ::testing::Test {
         serialization::HashTableImplType::LINEAR_OPEN_ADDRESSING);
 
     // Create Operators.
-    op_.reset(new AggregationOperator(*table_, true, aggr_state_index));
+    op_.reset(new AggregationOperator(*table_, true, aggr_state_index, 0));
 
     // Setup the InsertDestination proto in the query context proto.
     const QueryContext::insert_destination_id insert_destination_index =
@@ -363,8 +365,10 @@ class AggregationOperatorTest : public ::testing::Test {
     insert_destination_proto->set_relation_id(result_table_->getID());
     insert_destination_proto->set_relational_op_index(kOpIndex);
 
-    finalize_op_.reset(
-        new FinalizeAggregationOperator(aggr_state_index, *result_table_, insert_destination_index));
+    finalize_op_.reset(new FinalizeAggregationOperator(aggr_state_index,
+                                                       *result_table_,
+                                                       insert_destination_index,
+                                                       0 /* dummy query ID */));
 
     // Set up the QueryContext.
     query_context_.reset(new QueryContext(query_context_proto,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 333c3f0..4ef5a5c 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -336,7 +336,8 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
                             true /* is_stored */,
                             std::vector<attribute_id>(1, dim_col_long.getID()),
                             dim_col_long.getType().isNullable(),
-                            join_hash_table_index));
+                            join_hash_table_index,
+                            0));  // dummy query ID
 
   // Create the prober operator with one selection attribute.
   const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
@@ -368,7 +369,8 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
                            output_destination_index,
                            join_hash_table_index,
                            QueryContext::kInvalidPredicateId /* residual_predicate_index */,
-                           selection_index));
+                           selection_index,
+                           0  /* dummy query ID */));
 
   // Set up the QueryContext.
   query_context_.reset(new QueryContext(query_context_proto,
@@ -421,7 +423,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
   }
 
   // Create cleaner operator.
-  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0  /* dummy query ID */));
   cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
@@ -477,7 +479,8 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
                             true /* is_stored */,
                             std::vector<attribute_id>(1, dim_col_int.getID()),
                             dim_col_int.getType().isNullable(),
-                            join_hash_table_index));
+                            join_hash_table_index,
+                            0));  // dummy query ID
 
   // Create the prober operator with two selection attributes.
   const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
@@ -514,7 +517,8 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
                            output_destination_index,
                            join_hash_table_index,
                            QueryContext::kInvalidPredicateId /* residual_predicate_index */,
-                           selection_index));
+                           selection_index,
+                           0  /* dummy query ID */));
 
   // Set up the QueryContext.
   query_context_.reset(new QueryContext(query_context_proto,
@@ -588,7 +592,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
   }
 
   // Create cleaner operator.
-  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0  /* dummy query ID */));
   cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
@@ -636,7 +640,8 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
                             true /* is_stored */,
                             std::vector<attribute_id>(1, dim_col_char.getID()),
                             dim_col_char.getType().isNullable(),
-                            join_hash_table_index));
+                            join_hash_table_index,
+                            0));  // dummy query ID.
 
   // Create prober operator with one selection attribute.
   const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
@@ -668,7 +673,8 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
                            output_destination_index,
                            join_hash_table_index,
                            QueryContext::kInvalidPredicateId /* residual_predicate_index */,
-                           selection_index));
+                           selection_index,
+                           0  /* dummy query ID */));
 
   // Set up the QueryContext.
   query_context_.reset(new QueryContext(query_context_proto,
@@ -721,7 +727,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
   }
 
   // Create cleaner operator.
-  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0  /* dummy query ID */));
   cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
@@ -770,7 +776,8 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
                             true /* is_stored */,
                             std::vector<attribute_id>(1, dim_col_varchar.getID()),
                             dim_col_varchar.getType().isNullable(),
-                            join_hash_table_index));
+                            join_hash_table_index,
+                            0));  // dummy query ID.
 
   // Create prober operator with two selection attributes.
   const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
@@ -807,7 +814,8 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
                            output_destination_index,
                            join_hash_table_index,
                            QueryContext::kInvalidPredicateId /* residual_predicate_index */,
-                           selection_index));
+                           selection_index,
+                           0  /* dummy query ID */));
 
   // Set up the QueryContext.
   query_context_.reset(new QueryContext(query_context_proto,
@@ -885,7 +893,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
   }
 
   // Create the cleaner operator.
-  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0  /* dummy query ID */));
   cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
@@ -939,7 +947,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
                             true /* is_stored */,
                             dim_key_attrs,
                             dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(),
-                            join_hash_table_index));
+                            join_hash_table_index,
+                            0));  // dummy query ID.
 
   // Create the prober operator with two selection attributes.
   const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
@@ -980,7 +989,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
                            output_destination_index,
                            join_hash_table_index,
                            QueryContext::kInvalidPredicateId /* residual_predicate_index */,
-                           selection_index));
+                           selection_index,
+                           0  /* dummy query ID */));
 
   // Set up the QueryContext.
   query_context_.reset(new QueryContext(query_context_proto,
@@ -1058,7 +1068,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
   }
 
   // Create cleaner operator.
-  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0  /* dummy query ID */));
   cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 
@@ -1113,7 +1123,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
                             true /* is_stored */,
                             dim_key_attrs,
                             dim_col_long.getType().isNullable() || dim_col_varchar.getType().isNullable(),
-                            join_hash_table_index));
+                            join_hash_table_index,
+                            0));  // dummy query ID.
 
   // Create prober operator with two selection attributes.
   const QueryContext::scalar_group_id selection_index = query_context_proto.scalar_groups_size();
@@ -1164,7 +1175,8 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
                            output_destination_index,
                            join_hash_table_index,
                            residual_pred_index,
-                           selection_index));
+                           selection_index,
+                           0  /* dummy query ID */));
 
   // Set up the QueryContext.
   query_context_.reset(new QueryContext(query_context_proto,
@@ -1242,7 +1254,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
   }
 
   // Create cleaner operator.
-  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index));
+  unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(join_hash_table_index, 0  /* dummy query ID */));
   cleaner->informAllBlockingDependenciesMet();
   fetchAndExecuteWorkOrders(cleaner.get());
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index 50c508d..244091f 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -1573,7 +1573,8 @@ class SortMergeRunOperatorTest : public ::testing::Test {
                                              sort_config_index,
                                              merge_factor,
                                              top_k,
-                                             true));
+                                             true,
+                                             0  /* dummy query ID */));
     merge_op_->setOperatorIndex(kOpIndex);
 
     // Set up the QueryContext.
@@ -1616,7 +1617,8 @@ class SortMergeRunOperatorTest : public ::testing::Test {
                                              sort_config_index,
                                              merge_factor,
                                              top_k,
-                                             false));
+                                             false,
+                                             0  /* dummy query ID */));
     merge_op_->setOperatorIndex(kOpIndex);
 
     // Set up the QueryContext.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 7491778..6f24b92 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -358,7 +358,8 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
                                       *result_table_,
                                       insert_destination_index,
                                       sort_config_index,
-                                      true /* is_stored */));
+                                      true /* is_stored */,
+                                      0  /* dummy query ID */));
     run_gen->setOperatorIndex(kOpIndex);
 
     // Set up the QueryContext.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/40542682/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index 1dfad7b..7626686 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -193,7 +193,8 @@ TEST_F(TextScanOperatorTest, ScanTest) {
                            true,
                            false,
                            *relation_,
-                           output_destination_index));
+                           output_destination_index,
+                           0  /* dummy query ID */));
 
   // Setup query_context_.
   query_context_.reset(new QueryContext(query_context_proto,