You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by zu...@apache.org on 2016/06/09 05:41:12 UTC

[49/50] [abbrv] incubator-quickstep git commit: Serialized WorkOrders as proto.

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 6bf5719..ac81806 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -23,9 +23,11 @@
 #include <vector>
 
 #include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/SortMergeRunOperator.pb.h"
 #include "relational_operators/SortMergeRunOperatorHelpers.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "threading/ThreadIDBasedMap.hpp"
 
 #include "glog/logging.h"
@@ -69,6 +71,71 @@ bool SortMergeRunOperator::getAllWorkOrders(
   return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id, bus);
 }
 
+bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (input_relation_is_stored_) {
+    // Input blocks (or runs) are from base relation. Only possible when base
+    // relation is stored sorted.
+    if (!started_) {
+      // Initialize merge tree completely, since all input runs are known.
+      merge_tree_.initializeTree(input_relation_block_ids_.size());
+      started_ = true;
+      initializeInputRuns();
+    }
+  } else {
+    // Input blocks (or runs) are pipelined from the sorted run generation
+    // operator.
+    if (!started_ && !input_stream_done_) {
+      // Initialize merge tree for first pipeline mode.
+      merge_tree_.initializeForPipeline();
+      started_ = true;
+      initializeInputRuns();
+    }
+  }
+
+  // Get merge jobs from merge tree.
+  std::vector<MergeTree::MergeJob> jobs;
+  const bool done_generating = merge_tree_.getMergeJobs(&jobs);
+
+  for (std::vector<MergeTree::MergeJob>::size_type job_id = 0;
+       job_id < jobs.size();
+       ++job_id) {
+    // Add work order for each merge job.
+    container->addWorkOrderProto(createWorkOrderProto(&jobs[job_id]), op_index_);
+  }
+
+  return done_generating;
+}
+
+serialization::WorkOrder* SortMergeRunOperator::createWorkOrderProto(
+    merge_run_operator::MergeTree::MergeJob *job) {
+  DCHECK(job != nullptr);
+  DCHECK(!job->runs.empty());
+
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::SORT_MERGE_RUN);
+
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::operator_index, op_index_);
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::sort_config_index, sort_config_index_);
+
+  for (const merge_run_operator::Run &run : job->runs) {
+    serialization::Run *run_proto = proto->AddExtension(serialization::SortMergeRunWorkOrder::runs);
+    for (const block_id block : run) {
+      run_proto->add_blocks(block);
+    }
+  }
+
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::top_k, top_k_);
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::merge_level, job->level);
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::relation_id,
+                      job->level > 0 ? run_relation_.getID()
+                                     : input_relation_.getID());
+  proto->SetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index,
+                      job->is_final_level ? output_destination_index_
+                                          : run_block_destination_index_);
+
+  return proto;
+}
+
 WorkOrder *SortMergeRunOperator::createWorkOrder(
     merge_run_operator::MergeTree::MergeJob *job,
     QueryContext *query_context,

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index cfff8b9..177836f 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -44,8 +44,11 @@ namespace quickstep {
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
+namespace serialization { class WorkOrder; }
+
 /**
  * @defgroup SortMergeRun Merging Sorted Runs
  * @ingroup Sort
@@ -132,6 +135,8 @@ class SortMergeRunOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   void feedInputBlock(const block_id input_block_id,
                       const relation_id input_relation_id) override {
     input_relation_block_ids_.push_back(input_block_id);
@@ -182,6 +187,13 @@ class SortMergeRunOperator : public RelationalOperator {
                              const tmb::client_id scheduler_client_id,
                              tmb::MessageBus *bus);
 
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @param job The merge job.
+   **/
+  serialization::WorkOrder* createWorkOrderProto(merge_run_operator::MergeTree::MergeJob *job);
+
   const CatalogRelation &input_relation_;
 
   const CatalogRelation &output_relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/SortRunGenerationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.cpp b/relational_operators/SortRunGenerationOperator.cpp
index 37b8fb8..da9d5ee 100644
--- a/relational_operators/SortRunGenerationOperator.cpp
+++ b/relational_operators/SortRunGenerationOperator.cpp
@@ -21,7 +21,9 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageManager.hpp"
@@ -80,6 +82,42 @@ bool SortRunGenerationOperator::getAllWorkOrders(
   }
 }
 
+bool SortRunGenerationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (input_relation_is_stored_) {
+    // Input blocks are from a base relation.
+    if (!started_) {
+      for (const block_id input_block_id : input_relation_block_ids_) {
+        container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+      }
+      started_ = true;
+    }
+    return true;
+  } else {
+    // Input blocks are pipelined.
+    while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+      container->addWorkOrderProto(
+          createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+          op_index_);
+      ++num_workorders_generated_;
+    }
+    return done_feeding_input_relation_;
+  }
+}
+
+serialization::WorkOrder* SortRunGenerationOperator::createWorkOrderProto(const block_id block) {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::SORT_RUN_GENERATION);
+
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index, sort_config_index_);
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::relation_id, input_relation_.getID());
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index,
+                      output_destination_index_);
+  proto->SetExtension(serialization::SortRunGenerationWorkOrder::block_id, block);
+
+  return proto;
+}
+
+
 void SortRunGenerationWorkOrder::execute() {
   BlockReference block(
       storage_manager_->getBlock(input_block_id_, input_relation_));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index f96e6a6..96a3ce1 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -40,8 +40,11 @@ namespace quickstep {
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
+namespace serialization { class WorkOrder; }
+
 /**
  * \defgroup Sort Sorting
  * \ingroup RelationalOperators
@@ -112,6 +115,8 @@ class SortRunGenerationOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
     DCHECK(input_relation_id == input_relation_.getID());
     input_relation_block_ids_.push_back(input_block_id);
@@ -133,6 +138,13 @@ class SortRunGenerationOperator : public RelationalOperator {
   }
 
  private:
+  /**
+   * @brief Create Work Order proto.
+   *
+   * @param block The block id used in the Work Order.
+   **/
+  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
   const CatalogRelation &input_relation_;
 
   const CatalogRelation &output_relation_;

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/TableGeneratorOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index a3f9340..89c02c4 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *   University of Wisconsin\u2014Madison.
+ *     University of Wisconsin\u2014Madison.
  *   Copyright 2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,7 +20,9 @@
 
 #include "expressions/table_generator/GeneratorFunctionHandle.hpp"
 #include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 
@@ -53,6 +55,21 @@ bool TableGeneratorOperator::getAllWorkOrders(
   return started_;
 }
 
+bool TableGeneratorOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (!started_) {
+    serialization::WorkOrder *proto = new serialization::WorkOrder;
+    proto->set_work_order_type(serialization::TABLE_GENERATOR);
+
+    proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_);
+    proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_);
+
+    container->addWorkOrderProto(proto, op_index_);
+    started_ = true;
+  }
+  return true;
+}
+
+
 void TableGeneratorWorkOrder::execute() {
   ColumnVectorsValueAccessor temp_result;
   function_handle_.populateColumns(&temp_result);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index 6a6af4b..1b791a6 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -1,6 +1,6 @@
 /**
  *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- *   University of Wisconsin\u2014Madison.
+ *     University of Wisconsin\u2014Madison.
  *   Copyright 2016 Pivotal Software, Inc.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
@@ -40,6 +40,7 @@ namespace quickstep {
 class GeneratorFunctionHandle;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
 /** \addtogroup RelationalOperators
@@ -81,6 +82,8 @@ class TableGeneratorOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 5acecbf..3e3d9ef 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -33,8 +33,10 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/TextScanOperator.pb.h"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -217,6 +219,81 @@ bool TextScanOperator::getAllWorkOrders(
   }
 }
 
+bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
+  if (parallelize_load_) {
+    // Parallel implementation: Split work orders are generated for each file
+    // being bulk-loaded. (More than one file can be loaded, because we support
+    // glob() semantics in file name.) These work orders read the input file,
+    // and split them in the blobs that can be parsed independently.
+    if (blocking_dependencies_met_) {
+      if (!work_generated_) {
+        work_generated_ = true;
+
+        // First, generate text-split work orders.
+        for (const string &file : files) {
+          serialization::WorkOrder *proto = new serialization::WorkOrder;
+          proto->set_work_order_type(serialization::TEXT_SPLIT);
+
+          proto->SetExtension(serialization::TextSplitWorkOrder::operator_index, op_index_);
+          proto->SetExtension(serialization::TextSplitWorkOrder::filename, file);
+          proto->SetExtension(serialization::TextSplitWorkOrder::process_escape_sequences,
+                              process_escape_sequences_);
+
+          container->addWorkOrderProto(proto, op_index_);
+
+          ++num_split_work_orders_;
+        }
+        return false;
+      } else {
+        // Check if there are blobs to parse.
+        while (!text_blob_queue_.empty()) {
+          const TextBlob blob_work = text_blob_queue_.popOne();
+
+          serialization::WorkOrder *proto = new serialization::WorkOrder;
+          proto->set_work_order_type(serialization::TEXT_SCAN);
+
+          proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, field_terminator_);
+          proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences,
+                              process_escape_sequences_);
+          proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index,
+                              output_destination_index_);
+
+          serialization::TextBlob *text_blob_proto =
+              proto->MutableExtension(serialization::TextScanWorkOrder::text_blob);
+          text_blob_proto->set_blob_id(blob_work.blob_id);
+          text_blob_proto->set_size(blob_work.size);
+
+          container->addWorkOrderProto(proto, op_index_);
+        }
+        // Done if all split work orders are completed, and no blobs are left to
+        // process.
+        return num_done_split_work_orders_.load(std::memory_order_acquire) == num_split_work_orders_ &&
+               text_blob_queue_.empty();
+      }
+    }
+    return false;
+  } else {
+    // Serial implementation.
+    if (blocking_dependencies_met_ && !work_generated_) {
+      for (const string &file : files) {
+        serialization::WorkOrder *proto = new serialization::WorkOrder;
+        proto->set_work_order_type(serialization::TEXT_SCAN);
+
+        proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, field_terminator_);
+        proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences,
+                            process_escape_sequences_);
+        proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index, output_destination_index_);
+        proto->SetExtension(serialization::TextScanWorkOrder::filename, file);
+
+        container->addWorkOrderProto(proto, op_index_);
+      }
+      work_generated_ = true;
+    }
+    return work_generated_;
+  }
+}
+
 void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
   switch (msg.type()) {
     case kSplitWorkOrderCompletionMessage: {

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 3cda65b..c736145 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -48,6 +48,7 @@ namespace quickstep {
 class CatalogRelationSchema;
 class InsertDestination;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
 /** \addtogroup RelationalOperators
@@ -163,6 +164,8 @@ class TextScanOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   QueryContext::insert_destination_id getInsertDestinationID() const override {
     return output_destination_index_;
   }

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 2130563..ebf026b 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -26,7 +26,9 @@
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -72,6 +74,26 @@ bool UpdateOperator::getAllWorkOrders(
   return started_;
 }
 
+bool UpdateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+  if (blocking_dependencies_met_ && !started_) {
+    for (const block_id input_block_id : input_blocks_) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::UPDATE);
+
+      proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+      proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+
+      container->addWorkOrderProto(proto, op_index_);
+    }
+    started_ = true;
+  }
+  return started_;
+}
+
 void UpdateWorkOrder::execute() {
   MutableBlockReference block(
       storage_manager_->getBlockMutable(input_block_id_, relation_));

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/3f312fb2/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index cebb9b5..f6c5053 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -45,6 +45,7 @@ class InsertDestination;
 class Predicate;
 class Scalar;
 class StorageManager;
+class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
 /** \addtogroup RelationalOperators
@@ -99,6 +100,8 @@ class UpdateOperator : public RelationalOperator {
                         const tmb::client_id scheduler_client_id,
                         tmb::MessageBus *bus) override;
 
+  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
   QueryContext::insert_destination_id getInsertDestinationID() const override {
     return relocation_destination_index_;
   }