You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by hb...@apache.org on 2016/06/09 16:13:05 UTC

[2/4] incubator-quickstep git commit: Created QueryManager class and tests.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/SortMergeRunOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortMergeRunOperator_unittest.cpp b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
index fc10671..46fb7ae 100644
--- a/relational_operators/tests/SortMergeRunOperator_unittest.cpp
+++ b/relational_operators/tests/SortMergeRunOperator_unittest.cpp
@@ -189,6 +189,7 @@ class RunTest : public ::testing::Test {
                                        nullptr,
                                        storage_manager_.get(),
                                        kOpIndex,
+                                       0,  // dummy query ID.
                                        foreman_client_id_,
                                        &bus_));
   }
@@ -433,6 +434,7 @@ class RunMergerTest : public ::testing::Test {
                                        nullptr,
                                        storage_manager_.get(),
                                        kOpIndex,
+                                       0,  // dummy query ID.
                                        foreman_client_id_,
                                        &bus_));
   }
@@ -1269,9 +1271,12 @@ class SortMergeRunOperatorTest : public ::testing::Test {
     ASSERT_EQ(null_col3_, result_table_->getAttributeByName("null-col-3")->getID());
     ASSERT_EQ(tid_col_, result_table_->getAttributeByName("tid")->getID());
 
+    query_context_proto_.set_query_id(0);  // dummy query ID.
+
     // Setup the InsertDestination proto in the query context proto.
     insert_destination_index_ = query_context_proto_.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto_.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto_.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(result_table_id);
@@ -1291,6 +1296,7 @@ class SortMergeRunOperatorTest : public ::testing::Test {
 
     run_destination_index_ = query_context_proto_.insert_destinations_size();
     insert_destination_proto = query_context_proto_.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto_.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(run_table_id);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
index 71a80e4..3eeb7e9 100644
--- a/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
+++ b/relational_operators/tests/SortRunGenerationOperator_unittest.cpp
@@ -328,10 +328,12 @@ class SortRunGenerationOperatorTest : public ::testing::Test {
                    const std::vector<bool> &null_ordering) {
     // Setup the InsertDestination proto in the query context proto.
     serialization::QueryContext query_context_proto;
+    query_context_proto.set_query_id(0);  // dummy query ID.
 
     const QueryContext::insert_destination_id insert_destination_index =
         query_context_proto.insert_destinations_size();
     serialization::InsertDestination *insert_destination_proto = query_context_proto.add_insert_destinations();
+    insert_destination_proto->set_query_id(query_context_proto.query_id());
 
     insert_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
     insert_destination_proto->set_relation_id(result_table_->getID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index ef6fc2d..a972a37 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -180,9 +180,11 @@ TEST_F(TextScanOperatorTest, ScanTest) {
 
   // Setup the InsertDestination proto in the query context proto.
   serialization::QueryContext query_context_proto;
+  query_context_proto.set_query_id(0);  // dummy query ID.
 
   QueryContext::insert_destination_id output_destination_index = query_context_proto.insert_destinations_size();
   serialization::InsertDestination *output_destination_proto = query_context_proto.add_insert_destinations();
+  output_destination_proto->set_query_id(query_context_proto.query_id());
 
   output_destination_proto->set_insert_destination_type(serialization::InsertDestinationType::BLOCK_POOL);
   output_destination_proto->set_relation_id(relation_->getID());

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.cpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 354bed4..5e4dd28 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -60,6 +60,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
                                      const StorageBlockLayout *layout,
                                      StorageManager *storage_manager,
                                      const std::size_t relational_op_index,
+                                     const std::size_t query_id,
                                      const tmb::client_id scheduler_client_id,
                                      tmb::MessageBus *bus)
     : thread_id_map_(*ClientIDMap::Instance()),
@@ -67,6 +68,7 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
       relation_(relation),
       layout_(layout),
       relational_op_index_(relational_op_index),
+      query_id_(query_id),
       scheduler_client_id_(scheduler_client_id),
       bus_(DCHECK_NOTNULL(bus)) {
   if (layout_ == nullptr) {
@@ -74,11 +76,12 @@ InsertDestination::InsertDestination(const CatalogRelationSchema &relation,
   }
 }
 
-InsertDestination* InsertDestination::ReconstructFromProto(const serialization::InsertDestination &proto,
-                                                           const CatalogRelationSchema &relation,
-                                                           StorageManager *storage_manager,
-                                                           const tmb::client_id scheduler_client_id,
-                                                           tmb::MessageBus *bus) {
+InsertDestination* InsertDestination::ReconstructFromProto(
+    const serialization::InsertDestination &proto,
+    const CatalogRelationSchema &relation,
+    StorageManager *storage_manager,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus) {
   DCHECK(ProtoIsValid(proto, relation));
 
   StorageBlockLayout *layout = nullptr;
@@ -93,6 +96,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
                                                     layout,
                                                     storage_manager,
                                                     proto.relational_op_index(),
+                                                    proto.query_id(),
                                                     scheduler_client_id,
                                                     bus);
     }
@@ -107,6 +111,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
                                             storage_manager,
                                             move(blocks),
                                             proto.relational_op_index(),
+                                            proto.query_id(),
                                             scheduler_client_id,
                                             bus);
     }
@@ -134,6 +139,7 @@ InsertDestination* InsertDestination::ReconstructFromProto(const serialization::
           storage_manager,
           move(partitions),
           proto.relational_op_index(),
+          proto.query_id(),
           scheduler_client_id,
           bus);
     }
@@ -262,6 +268,7 @@ MutableBlockReference AlwaysCreateBlockInsertDestination::createNewBlock() {
   serialization::CatalogRelationNewBlockMessage proto;
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -309,6 +316,7 @@ MutableBlockReference BlockPoolInsertDestination::createNewBlock() {
   serialization::CatalogRelationNewBlockMessage proto;
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));
@@ -385,21 +393,29 @@ const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInterna
   return done_block_ids_;
 }
 
-PartitionAwareInsertDestination::PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
-                                                                 const CatalogRelationSchema &relation,
-                                                                 const StorageBlockLayout *layout,
-                                                                 StorageManager *storage_manager,
-                                                                 vector<vector<block_id>> &&partitions,
-                                                                 const std::size_t relational_op_index,
-                                                                 const tmb::client_id scheduler_client_id,
-                                                                 tmb::MessageBus *bus)
-    : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+PartitionAwareInsertDestination::PartitionAwareInsertDestination(
+    PartitionSchemeHeader *partition_scheme_header,
+    const CatalogRelationSchema &relation,
+    const StorageBlockLayout *layout,
+    StorageManager *storage_manager,
+    vector<vector<block_id>> &&partitions,
+    const std::size_t relational_op_index,
+    const std::size_t query_id,
+    const tmb::client_id scheduler_client_id,
+    tmb::MessageBus *bus)
+    : InsertDestination(relation,
+                        layout,
+                        storage_manager,
+                        relational_op_index,
+                        query_id,
+                        scheduler_client_id,
+                        bus),
       partition_scheme_header_(DCHECK_NOTNULL(partition_scheme_header)),
       available_block_refs_(partition_scheme_header_->getNumPartitions()),
       available_block_ids_(move(partitions)),
       done_block_ids_(partition_scheme_header_->getNumPartitions()),
-      mutexes_for_partition_(new SpinMutex[partition_scheme_header_->getNumPartitions()]) {
-}
+      mutexes_for_partition_(
+          new SpinMutex[partition_scheme_header_->getNumPartitions()]) {}
 
 MutableBlockReference PartitionAwareInsertDestination::createNewBlock() {
   FATAL_ERROR("PartitionAwareInsertDestination::createNewBlock needs a partition id as an argument.");
@@ -415,6 +431,7 @@ MutableBlockReference PartitionAwareInsertDestination::createNewBlockInPartition
   proto.set_relation_id(relation_.getID());
   proto.set_block_id(new_id);
   proto.set_partition_id(part_id);
+  proto.set_query_id(getQueryID());
 
   const size_t proto_length = proto.ByteSize();
   char *proto_bytes = static_cast<char*>(malloc(proto_length));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.hpp
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 670cd6c..6968149 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -78,6 +78,7 @@ class InsertDestination : public InsertDestinationInterface {
    * @param storage_manager The StorageManager to use.
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
+   * @param query_id The ID of this query.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
@@ -85,6 +86,7 @@ class InsertDestination : public InsertDestinationInterface {
                     const StorageBlockLayout *layout,
                     StorageManager *storage_manager,
                     const std::size_t relational_op_index,
+                    const std::size_t query_id,
                     const tmb::client_id scheduler_client_id,
                     tmb::MessageBus *bus);
 
@@ -211,6 +213,7 @@ class InsertDestination : public InsertDestinationInterface {
     proto.set_operator_index(relational_op_index_);
     proto.set_block_id(id);
     proto.set_relation_id(relation_.getID());
+    proto.set_query_id(query_id_);
 
     // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
     const std::size_t proto_length = proto.ByteSize();
@@ -253,6 +256,10 @@ class InsertDestination : public InsertDestinationInterface {
         " ID " << scheduler_client_id_;
   }
 
+  inline const std::size_t getQueryID() const {
+    return query_id_;
+  }
+
   const ClientIDMap &thread_id_map_;
 
   StorageManager *storage_manager_;
@@ -260,6 +267,7 @@ class InsertDestination : public InsertDestinationInterface {
 
   std::unique_ptr<const StorageBlockLayout> layout_;
   const std::size_t relational_op_index_;
+  const std::size_t query_id_;
 
   tmb::client_id scheduler_client_id_;
   tmb::MessageBus *bus_;
@@ -288,10 +296,16 @@ class AlwaysCreateBlockInsertDestination : public InsertDestination {
                                      const StorageBlockLayout *layout,
                                      StorageManager *storage_manager,
                                      const std::size_t relational_op_index,
+                                     const std::size_t query_id,
                                      const tmb::client_id scheduler_client_id,
                                      tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
-  }
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus) {}
 
   ~AlwaysCreateBlockInsertDestination() override {
   }
@@ -334,16 +348,23 @@ class BlockPoolInsertDestination : public InsertDestination {
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
+   * @param query_id The ID of the query.
    * @param bus A pointer to the TMB.
    **/
   BlockPoolInsertDestination(const CatalogRelationSchema &relation,
                              const StorageBlockLayout *layout,
                              StorageManager *storage_manager,
                              const std::size_t relational_op_index,
+                             const std::size_t query_id,
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus) {
-  }
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus) {}
 
   /**
    * @brief Constructor.
@@ -363,9 +384,16 @@ class BlockPoolInsertDestination : public InsertDestination {
                              StorageManager *storage_manager,
                              std::vector<block_id> &&blocks,
                              const std::size_t relational_op_index,
+                             const std::size_t query_id,
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus)
-      : InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
+      : InsertDestination(relation,
+                          layout,
+                          storage_manager,
+                          relational_op_index,
+                          query_id,
+                          scheduler_client_id,
+                          bus),
         available_block_ids_(std::move(blocks)) {
     // TODO(chasseur): Once block fill statistics are available, replace this
     // with something smarter.
@@ -386,7 +414,6 @@ class BlockPoolInsertDestination : public InsertDestination {
   MutableBlockReference createNewBlock() override;
 
  private:
-  FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
   FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
 
   // A vector of references to blocks which are loaded in memory.
@@ -416,17 +443,20 @@ class PartitionAwareInsertDestination : public InsertDestination {
    * @param partitions The blocks in partitions.
    * @param relational_op_index The index of the relational operator in the
    *        QueryPlan DAG that has outputs.
+   * @param query_id The ID of the query.
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    **/
-  PartitionAwareInsertDestination(PartitionSchemeHeader *partition_scheme_header,
-                                  const CatalogRelationSchema &relation,
-                                  const StorageBlockLayout *layout,
-                                  StorageManager *storage_manager,
-                                  std::vector<std::vector<block_id>> &&partitions,
-                                  const std::size_t relational_op_index,
-                                  const tmb::client_id scheduler_client_id,
-                                  tmb::MessageBus *bus);
+  PartitionAwareInsertDestination(
+      PartitionSchemeHeader *partition_scheme_header,
+      const CatalogRelationSchema &relation,
+      const StorageBlockLayout *layout,
+      StorageManager *storage_manager,
+      std::vector<std::vector<block_id>> &&partitions,
+      const std::size_t relational_op_index,
+      const std::size_t query_id,
+      const tmb::client_id scheduler_client_id,
+      tmb::MessageBus *bus);
 
   ~PartitionAwareInsertDestination() override {
     delete[] mutexes_for_partition_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/a690455e/storage/InsertDestination.proto
----------------------------------------------------------------------
diff --git a/storage/InsertDestination.proto b/storage/InsertDestination.proto
index 6083539..a3d8acf 100644
--- a/storage/InsertDestination.proto
+++ b/storage/InsertDestination.proto
@@ -34,6 +34,7 @@ message InsertDestination {
   optional StorageBlockLayoutDescription layout = 3;
 
   required uint64 relational_op_index = 4;
+  required uint64 query_id = 5;
 
   // The convention for extension numbering is that extensions for a particular
   // tInsertDestinationType should begin from (insert_destination_type + 1) * 16.