You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/06/16 04:20:08 UTC

[09/20] incubator-quickstep git commit: Long lived Foreman thread

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
index 8352d55..9204073 100644
--- a/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
+++ b/query_optimizer/tests/ExecutionGeneratorTestRunner.hpp
@@ -61,9 +61,11 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
     bus_.Initialize();
 
-    foreman_.reset(new Foreman(&bus_,
-                               test_database_loader_.catalog_database(),
-                               test_database_loader_.storage_manager()));
+    main_thread_client_id_ = bus_.Connect();
+    bus_.RegisterClientAsSender(main_thread_client_id_, kAdmitRequestMessage);
+    bus_.RegisterClientAsSender(main_thread_client_id_, kPoisonMessage);
+    bus_.RegisterClientAsReceiver(main_thread_client_id_, kWorkloadCompletionMessage);
+
     worker_.reset(new Worker(0, &bus_));
 
     std::vector<client_id> worker_client_ids;
@@ -75,27 +77,20 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
     workers_.reset(new WorkerDirectory(1 /* number of workers */,
                                        worker_client_ids, numa_nodes));
-    foreman_->setWorkerDirectory(workers_.get());
+    foreman_.reset(new Foreman(main_thread_client_id_,
+                               workers_.get(),
+                               &bus_,
+                               test_database_loader_.catalog_database(),
+                               test_database_loader_.storage_manager()));
 
+    foreman_->start();
     worker_->start();
   }
 
   ~ExecutionGeneratorTestRunner() {
-    std::unique_ptr<WorkerMessage> poison_message(WorkerMessage::PoisonMessage());
-    TaggedMessage poison_tagged_message(poison_message.get(),
-                                        sizeof(*poison_message),
-                                        quickstep::kPoisonMessage);
-
-    Address worker_address;
-    MessageStyle single_receiver_style;
-
-    worker_address.AddRecipient(worker_->getBusClientID());
-    bus_.Send(foreman_->getBusClientID(),
-              worker_address,
-              single_receiver_style,
-              std::move(poison_tagged_message));
-
+    QueryExecutionUtil::BroadcastPoisonMessage(main_thread_client_id_, &bus_);
     worker_->join();
+    foreman_->join();
   }
 
   void runTestCase(const std::string &input,
@@ -112,6 +107,8 @@ class ExecutionGeneratorTestRunner : public TextBasedTestRunner {
 
   std::unique_ptr<WorkerDirectory> workers_;
 
+  tmb::client_id main_thread_client_id_;
+
   // This map is needed for InsertDestination and some operators that send
   // messages to Foreman directly. To know the reason behind the design of this
   // map, see the note in InsertDestination.hpp.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/query_optimizer/tests/TestDatabaseLoader.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/tests/TestDatabaseLoader.cpp b/query_optimizer/tests/TestDatabaseLoader.cpp
index 2de69b6..764ff2f 100644
--- a/query_optimizer/tests/TestDatabaseLoader.cpp
+++ b/query_optimizer/tests/TestDatabaseLoader.cpp
@@ -122,6 +122,7 @@ void TestDatabaseLoader::loadTestRelation() {
                                          nullptr,
                                          &storage_manager_,
                                          0 /* dummy op index */,
+                                         0,  // dummy query ID.
                                          scheduler_client_id_,
                                          &bus_);
   int sign = 1;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/DeleteOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 47e36e9..933918b 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -132,6 +132,7 @@ void DeleteWorkOrder::execute() {
   proto.set_operator_index(delete_operator_index_);
   proto.set_block_id(input_block_id_);
   proto.set_relation_id(input_relation_.getID());
+  proto.set_query_id(query_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/DeleteOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index aa8a688..74da8c1 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -174,6 +174,7 @@ class DeleteWorkOrder : public WorkOrder {
   StorageManager *storage_manager_;
 
   const std::size_t delete_operator_index_;
+
   const tmb::client_id scheduler_client_id_;
   MessageBus *bus_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/HashJoinOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 6f4271d..9762f04 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -644,14 +644,14 @@ class HashOuterJoinWorkOrder : public WorkOrder {
    * @param join_key_attributes The IDs of equijoin attributes in \c
    *        probe_relation.
    * @param any_join_key_attributes_nullable If any attribute is nullable.
-   * @param hash_table The JoinHashTable to use.
+   * @param lookup_block_id The block id of the probe_relation.
    * @param selection A list of Scalars corresponding to the relation attributes
    *        in \c output_destination. Each Scalar is evaluated for the joined
    *        tuples, and the resulting value is inserted into the join result.
    * @param is_selection_on_build Whether each Scalar in the \p selection vector
    *        is using attributes from the build relation as input. Note that the
    *        length of this vector should equal the length of \p selection.
-   * @param lookup_block_id The block id of the probe_relation.
+   * @param hash_table The JoinHashTable to use.
    * @param output_destination The InsertDestination to insert the join results.
    * @param storage_manager The StorageManager to use.
    **/

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/RebuildWorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/RebuildWorkOrder.hpp b/relational_operators/RebuildWorkOrder.hpp
index 86f8eaf..3125447 100644
--- a/relational_operators/RebuildWorkOrder.hpp
+++ b/relational_operators/RebuildWorkOrder.hpp
@@ -85,6 +85,7 @@ class RebuildWorkOrder : public WorkOrder {
     proto.set_operator_index(input_operator_index_);
     proto.set_block_id(block_ref_->getID());
     proto.set_relation_id(input_relation_id_);
+    proto.set_query_id(query_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index e398d62..1603b78 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -327,6 +327,7 @@ void SortMergeRunWorkOrder::execute() {
 
   // Send completion message to operator.
   FeedbackMessage msg(SortMergeRunOperator::kRunOutputMessage,
+                      getQueryID(),
                       operator_index_,
                       serialized_output.first,
                       serialized_output.second);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 1b2979e..f103b0e 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -114,6 +114,7 @@ void UpdateWorkOrder::execute() {
   proto.set_operator_index(update_operator_index_);
   proto.set_block_id(input_block_id_);
   proto.set_relation_id(relation_.getID());
+  proto.set_query_id(query_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const std::size_t proto_length = proto.ByteSize();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index f6c5053..4471a17 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -184,6 +184,7 @@ class UpdateWorkOrder : public WorkOrder {
   StorageManager *storage_manager_;
 
   const std::size_t update_operator_index_;
+
   const tmb::client_id scheduler_client_id_;
   MessageBus *bus_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/WorkOrder.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.hpp b/relational_operators/WorkOrder.hpp
index 059865d..df195cc 100644
--- a/relational_operators/WorkOrder.hpp
+++ b/relational_operators/WorkOrder.hpp
@@ -65,20 +65,25 @@ class WorkOrder {
    *       relational operator.
    */
   struct FeedbackMessageHeader {
+    std::size_t query_id;
     std::size_t rel_op_index;
     std::size_t payload_size;
     FeedbackMessageType payload_type;
 
     /**
      * @brief Header constructor.
+     *
+     * @param query_id The ID of the query.
      * @param relational_op_index Index of the relation operator.
      * @param payload_size Size of the payload of the message.
      * @param payload_type Type of payload.
      */
-    FeedbackMessageHeader(const std::size_t relational_op_index,
+    FeedbackMessageHeader(const std::size_t query_id,
+                          const std::size_t relational_op_index,
                           const std::size_t payload_size,
                           const FeedbackMessageType payload_type)
-        : rel_op_index(relational_op_index),
+        : query_id(query_id),
+          rel_op_index(relational_op_index),
           payload_size(payload_size),
           payload_type(payload_type) {}
   };
@@ -93,17 +98,19 @@ class WorkOrder {
      * @brief Feedback message constructor.
      *
      * @param type Type of the message.
+     * @param query_id The ID of the query.
      * @param rel_op_index Relational operator index.
      * @param payload Blob of payload.
      * @param payload_size Size of the payload blob.
      * @param ownership Whether to take ownership of the payload blob.
      */
     FeedbackMessage(const FeedbackMessageType type,
+                    const std::size_t query_id,
                     const std::size_t rel_op_index,
                     void *payload,
                     const std::size_t payload_size,
                     const bool ownership = true)
-        : header_(rel_op_index, payload_size, type),
+        : header_(query_id, rel_op_index, payload_size, type),
           payload_(payload),
           ownership_(ownership) {}
 
@@ -285,6 +292,13 @@ class WorkOrder {
         " receiver thread with TMB client ID " << receiver_id;
   }
 
+  /**
+   * @brief Get the ID of the query which this WorkOder belongs to.
+   **/
+  inline const std::size_t getQueryID() const {
+    return query_id_;
+  }
+
  protected:
   /**
    * @brief Constructor.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/AggregationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index fdcc54f..fd4692a 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -228,6 +228,8 @@ class AggregationOperatorTest : public ::testing::Test {
 
     // Setup the aggregation state proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
+
     const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
     serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
     aggr_state_proto->set_relation_id(table_->getID());
@@ -319,6 +321,8 @@ class AggregationOperatorTest : public ::testing::Test {
 
     // Setup the aggregation state proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
+
     const QueryContext::aggregation_state_id aggr_state_index = query_context_proto.aggregation_states_size();
     serialization::AggregationOperationState *aggr_state_proto = query_context_proto.add_aggregation_states();
     aggr_state_proto->set_relation_id(table_->getID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/HashJoinOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 074b603..9c34170 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -294,6 +294,7 @@ class HashJoinOperatorTest : public ::testing::TestWithParam<HashTableImplType>
 TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -434,6 +435,7 @@ TEST_P(HashJoinOperatorTest, LongKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -604,6 +606,7 @@ TEST_P(HashJoinOperatorTest, IntDuplicateKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -739,6 +742,7 @@ TEST_P(HashJoinOperatorTest, CharKeyCartesianProductHashJoinTest) {
 TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -906,6 +910,7 @@ TEST_P(HashJoinOperatorTest, VarCharDuplicateKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();
@@ -1083,6 +1088,7 @@ TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinTest) {
 TEST_P(HashJoinOperatorTest, CompositeKeyHashJoinWithResidualPredicateTest) {
   // Setup the hash table proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   const QueryContext::join_hash_table_id join_hash_table_index =
       query_context_proto.join_hash_tables_size();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index fc10671..46bce5b 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -189,6 +189,7 @@ class RunTest : public ::testing::Test {
                                        nullptr,
                                        storage_manager_.get(),
                                        kOpIndex,
+                                       0,  // dummy query ID.
                                        foreman_client_id_,
                                        &bus_));
   }
@@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test {
                                        nullptr,
                                        storage_manager_.get(),
                                        kOpIndex,
+                                       0,  // dummy query ID.
                                        foreman_client_id_,
                                        &bus_));
   }
@@ -1269,6 +1271,8 @@ class SortMergeRunOperatorTest : public ::testing::Test {
     ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
     ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
 
+    query_context_proto_.set_query_id(0);  // dummy query ID.
+
     // Setup the InsertDestination proto in the query context proto.
     insert_destination_index_ = query_context_proto_.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 71a80e4..bd682c2 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -328,6 +328,7 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
                    const std::vector<bool> &null_ordering) {
     // Setup the InsertDestination proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
 
     const QueryContext::insert_destination_id insert_destination_index =
         query_context_proto.insert_destinations_size();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index 5860745..5bcbee5 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -180,6 +180,7 @@ TEST_F(TextScanOperatorTest, ScanTest) {
 
   // Setup the InsertDestination proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size();
   serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations();

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 354bed4..2866c5f 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -60,6 +60,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
                                      const StorageBlockLayout *layout,
                                      StorageManager *storage_manager,
                                      const std::size_t relational_op_index,
+                                     const std::size_t query_id,
                                      const tmb::client_id scheduler_client_id,
                                      tmb::MessageBus *bus)
     : thread_id_map_(*ClientIDMap::Instance()),
@@ -67,6 +68,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
       relation_(relation),
       layout_(layout),
       relational_op_index_(relational_op_index),
+      query_id_(query_id),
       scheduler_client_id_(scheduler_client_id),
       bus_(DCHECK_NOTNULL(bus)) {
   if (layout_ == nullptr) {
@@ -74,11 +76,13 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
   }
 }
 
-InsertDestination* InsertDestination::ReconstructFromProto(const serialization::InsertDestination &proto,
-                                                           const CatalogRelationSchema &relation,
-                                                           StorageManager *storage_manager,
-                                                           const tmb::client_id scheduler_client_id,
-                                                           tmb::MessageBus *bus) {
+InsertDestination* InsertDestination::ReconstructFromProto(
+    const std::size_t query_id,
+    const serialization::InsertDestination &proto,
+    const CatalogRelationSchema &relation,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
   DCHECK(ProtoIsValid(proto, relation));
 
   StorageBlockLayout *layout = nullptr;
@@ -93,6 +97,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
                                                     layout,
                                                     storage_manager,
                                                     proto.relational_op_index(),
+                                                    query_id,
                                                     scheduler_client_id,
                                                     bus);
     }
@@ -107,6 +112,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
                                             storage_manager,
                                             move(blocks),
                                             proto.relational_op_index(),
+                                            query_id,
                                             scheduler_client_id,
                                             bus);
     }
@@ -134,6 +140,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
           storage_manager,
           move(partitions),
           proto.relational_op_index(),
+          query_id,
           scheduler_client_id,
           bus);
     }
@@ -262,6 +269,7 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
   serialization::CatalogRelationNewBlockMessage proto;
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -309,6 +317,7 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
   serialization::CatalogRelationNewBlockMessage proto;
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -385,21 +394,29 @@ const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInterna
   return done_block_ids_;
 }
 
-PartitionAwareInsertDestination::PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
-                                                                 const CatalogRelationSchema &relation,
-                                                                 const StorageBlockLayout *layout,
-                                                                 StorageManager *storage_manager,
-                                                                 vector<vector<block_id>> &&partitions,
-                                                                 const std::size_t relational_op_index,
-                                                                 const tmb::client_id scheduler_client_id,
-                                                                 tmb::MessageBus *bus)
-    : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+PartitionAwareInsertDestination::PartitionAwareInsertDestination(
+    PartitionSchemeHeader *partition_scheme_header,
+    const CatalogRelationSchema &relation,
+    const StorageBlockLayout *layout,
+    StorageManager *storage_manager,
+    vector<vector<block_id>> &&partitions,
+    const std::size_t relational_op_index,
+    const std::size_t query_id,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus)
+    : InsertDestination(relation,
+                        layout,
+                        storage_manager,
+                        relational_op_index,
+                        query_id,
+                        scheduler_client_id,
+                        bus),
       partition_scheme_header_(DCHECK_NOTNULL(partition_scheme_header)),
       available_block_refs_(partition_scheme_header_->getNumPartitions()),
       available_block_ids_(move(partitions)),
       done_block_ids_(partition_scheme_header_->getNumPartitions()),
-      mutexes_for_partition_(new SpinMutex[partition_scheme_header_->getNumPartitions()]) {
-}
+      mutexes_for_partition_(
+          new SpinMutex[partition_scheme_header_->getNumPartitions()]) {}
 
 MutableBlockReference PartitionAwareInsertDestination::createNewBlock() {
   FATAL_ERROR("PartitionAwareInsertDestination::createNewBlock needs a partition id as an argument.");
@@ -415,6 +432,7 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
   proto.set_partition_id(part_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/8230b124/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 670cd6c..5ff33f5 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -78,6 +78,7 @@ class InsertDestination : public InsertDestinationInterface {
    * @param storage_manager The StorageManager to use.
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
+   * @param query_id The ID of this query.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
@@ -85,6 +86,7 @@ class InsertDestination : public InsertDestinationInterface {
                     const StorageBlockLayout *layout,
                     StorageManager *storage_manager,
                     const std::size_t relational_op_index,
+                    const std::size_t query_id,
                     const tmb::client_id scheduler_client_id,
                     tmb::MessageBus *bus);
 
@@ -98,6 +100,7 @@ class InsertDestination : public InsertDestinationInterface {
    * @brief A factory method to generate the InsertDestination from the
    *        serialized Protocol Buffer representation.
    *
+   * @param query_id The ID of this query.
    * @param proto A serialized Protocol Buffer representation of an
    *        InsertDestination, originally generated by the optimizer.
    * @param relation The relation to insert tuples into.
@@ -107,11 +110,13 @@ class InsertDestination : public InsertDestinationInterface {
    *
    * @return The constructed InsertDestination.
    */
-  static InsertDestination* ReconstructFromProto(const serialization::InsertDestination &proto,
-                                                 const CatalogRelationSchema &relation,
-                                                 StorageManager *storage_manager,
-                                                 const tmb::client_id scheduler_client_id,
-                                                 tmb::MessageBus *bus);
+  static InsertDestination* ReconstructFromProto(
+      const std::size_t query_id,
+      const serialization::InsertDestination &proto,
+      const CatalogRelationSchema &relation,
+      StorageManager *storage_manager,
+      const tmb::client_id scheduler_client_id,
+      tmb::MessageBus *bus);
 
   /**
    * @brief Check whether a serialized InsertDestination is fully-formed and
@@ -211,6 +216,7 @@ class InsertDestination : public InsertDestinationInterface {
     proto.set_operator_index(relational_op_index_);
     proto.set_block_id(id);
     proto.set_relation_id(relation_.getID());
+    proto.set_query_id(query_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -253,6 +259,10 @@ class InsertDestination : public InsertDestinationInterface {
         " ID " << scheduler_client_id_;
   }
 
+  inline const std::size_t getQueryID() const {
+    return query_id_;
+  }
+
   const ClientIDMap &thread_id_map_;
 
   StorageManager *storage_manager_;
@@ -260,6 +270,7 @@ class InsertDestination : public InsertDestinationInterface {
 
   std::unique_ptr<const StorageBlockLayout> layout_;
   const std::size_t relational_op_index_;
+  const std::size_t query_id_;
 
   tmb::client_id scheduler_client_id_;
   tmb::MessageBus *bus_;
@@ -288,10 +299,16 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
                                      const StorageBlockLayout *layout,
                                      StorageManager *storage_manager,
                                      const std::size_t relational_op_index,
+                                     const std::size_t query_id,
                                      const tmb::client_id scheduler_client_id,
                                      tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
-  }
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus) {}
 
   ~AlwaysCreateBlockInsertDestination() override {
   }
@@ -334,16 +351,23 @@ class BlockPoolInsertDestination : public InsertDestination {
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
+   * @param query_id The ID of the query.
    * @param bus A pointer to the TMB.
    **/
   BlockPoolInsertDestination(const CatalogRelationSchema &relation,
                              const StorageBlockLayout *layout,
                              StorageManager *storage_manager,
                              const std::size_t relational_op_index,
+                             const std::size_t query_id,
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
-  }
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus) {}
 
   /**
    * @brief Constructor.
@@ -363,9 +387,16 @@ class BlockPoolInsertDestination : public InsertDestination {
                              StorageManager *storage_manager,
                              std::vector<block_id> &&blocks,
                              const std::size_t relational_op_index,
+                             const std::size_t query_id,
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus),
         available_block_ids_(std::move(blocks)) {
     // TODO(chasseur): Once block fill statistics are available, replace this
     // with something smarter.
@@ -386,7 +417,6 @@ class BlockPoolInsertDestination : public InsertDestination {
   MutableBlockReference createNewBlock() override;
 
  private:
-  FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
   FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
 
   // A vector of references to blocks which are loaded in memory.
@@ -416,17 +446,20 @@ class PartitionAwareInsertDestination : public InsertDestination {
    * @param partitions The blocks in partitions.
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
+   * @param query_id The ID of the query.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
-  PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
-                                  const CatalogRelationSchema &relation,
-                                  const StorageBlockLayout *layout,
-                                  StorageManager *storage_manager,
-                                  std::vector<std::vector<block_id>> &&partitions,
-                                  const std::size_t relational_op_index,
-                                  const tmb::client_id scheduler_client_id,
-                                  tmb::MessageBus *bus);
+  PartitionAwareInsertDestination(
+      PartitionSchemeHeader *partition_scheme_header,
+      const CatalogRelationSchema &relation,
+      const StorageBlockLayout *layout,
+      StorageManager *storage_manager,
+      std::vector<std::vector<block_id>> &&partitions,
+      const std::size_t relational_op_index,
+      const std::size_t query_id,
+      const tmb::client_id scheduler_client_id,
+      tmb::MessageBus *bus);
 
   ~PartitionAwareInsertDestination() override {
     delete[] mutexes_for_partition_;