You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/05/04 03:22:47 UTC

[03/32] incubator-quickstep git commit: Add protobuf support for union all operator.

Add protobuf support for union all operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/758f07a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/758f07a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/758f07a1

Branch: refs/heads/new-op
Commit: 758f07a1613e68823fab9323461fdcae56714365
Parents: 3c83c93
Author: Tianrun <Ti...@node-0.tianrun-qv23700.quickstep-pg0.wisc.cloudlab.us>
Authored: Mon Apr 17 11:03:13 2017 -0600
Committer: Tianrun <Ti...@node-2.tianrun-qv24978.quickstep-pg0.wisc.cloudlab.us>
Committed: Thu Apr 20 14:41:28 2017 -0600

----------------------------------------------------------------------
 relational_operators/CMakeLists.txt       |  1 +
 relational_operators/UnionAllOperator.cpp | 58 ++++++++++++++++++++++----
 relational_operators/UnionAllOperator.hpp | 11 ++++-
 relational_operators/WorkOrder.proto      | 10 +++++
 relational_operators/WorkOrderFactory.cpp | 40 ++++++++++++++++++
 5 files changed, 110 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 4ea809b..39538ea 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -586,6 +586,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_SortRunGenerationOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
+                      quickstep_relationaloperators_UnionAllOperator
                       quickstep_relationaloperators_UpdateOperator
                       quickstep_relationaloperators_WindowAggregationOperator
                       quickstep_relationaloperators_WorkOrder_proto

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/UnionAllOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UnionAllOperator.cpp b/relational_operators/UnionAllOperator.cpp
index 141b3cf..1d82aef 100644
--- a/relational_operators/UnionAllOperator.cpp
+++ b/relational_operators/UnionAllOperator.cpp
@@ -65,7 +65,7 @@ void UnionAllOperator::addWorkOrdersSingleRelation(
       container->addNormalWorkOrder(
           new UnionAllWorkOrder(
               query_id_,
-              input_relations_[relation_index],
+              *input_relations_[relation_index],
               input_block_id,
               select_attribute_ids_[relation_index],
               output_destination,
@@ -75,11 +75,11 @@ void UnionAllOperator::addWorkOrdersSingleRelation(
   } else {
     std::size_t num_generated = num_workorders_generated_[relation_index];
     const std::vector<block_id> &all_blocks = input_relations_block_ids_[relation_index];
-    while (num_generated < all_blocks .size()) {
+    while (num_generated < all_blocks.size()) {
       container->addNormalWorkOrder(
           new UnionAllWorkOrder(
               query_id_,
-              input_relations_[relation_index],
+              *input_relations_[relation_index],
               all_blocks[num_generated],
               select_attribute_ids_[relation_index],
               output_destination,
@@ -124,18 +124,60 @@ bool UnionAllOperator::getAllWorkOrders(
                                    relation_index);
     }
   }
-  return stored_generated_ && done_feeding_input_relation_;
+  return done_feeding_input_relation_;
 }
 
 bool UnionAllOperator::getAllWorkOrderProtos(WorkOrderProtosContainer* container) {
-  // TODO(tianrun): Add protobuf for UnionAllWorkOrder to support distributed mode.
-  LOG(FATAL) << "UnionAllOperator is not supported in distributed mode yet.";
-  return true;
+  if (!stored_generated_) {
+    for (std::size_t relation_index = 0; relation_index < input_relations_.size(); ++relation_index) {
+      if (input_relations_are_stored_[relation_index]) {
+        const std::vector<block_id> &all_blocks = input_relations_block_ids_[relation_index];
+        const relation_id relation = input_relations_[relation_index]->getID();
+        const std::vector<attribute_id> &attributes = select_attribute_ids_[relation_index];
+        for (const block_id block : all_blocks) {
+          container->addWorkOrderProto(createWorkOrderProto(block, relation, attributes), op_index_);
+        }
+      }
+    }
+    stored_generated_ = true;
+  }
+
+  for (std::size_t relation_index = 0; relation_index < input_relations_.size(); ++relation_index) {
+    if (!input_relations_are_stored_[relation_index]) {
+      const std::vector<block_id> &all_blocks = input_relations_block_ids_[relation_index];
+      std::size_t num_generated = num_workorders_generated_[relation_index];
+      const relation_id relation = input_relations_[relation_index]->getID();
+      const std::vector<attribute_id> &attributes = select_attribute_ids_[relation_index];
+      while (num_generated < all_blocks.size()) {
+        container->addWorkOrderProto(createWorkOrderProto(all_blocks[num_generated], relation, attributes), op_index_);
+        ++num_generated;
+      }
+      num_workorders_generated_[relation_index] = num_generated;
+    }
+  }
+  return done_feeding_input_relation_;
+}
+
+serialization::WorkOrder* UnionAllOperator::createWorkOrderProto(
+    const block_id block,
+    const relation_id relation,
+    const std::vector<attribute_id> &attributes) {
+  serialization::WorkOrder *proto = new serialization::WorkOrder;
+  proto->set_work_order_type(serialization::UNION_ALL);
+  proto->set_query_id(query_id_);
+
+  proto->SetExtension(serialization::UnionAllWorkOrder::relation_id, relation);
+  proto->SetExtension(serialization::UnionAllWorkOrder::insert_destination_index, output_destination_index_);
+  proto->SetExtension(serialization::UnionAllWorkOrder::block_id, block);
+  for (const attribute_id attr : attributes) {
+    proto->AddExtension(serialization::UnionAllWorkOrder::select_attribute_id, attr);
+  }
+  return proto;
 }
 
 void UnionAllWorkOrder::execute() {
   BlockReference block(
-      storage_manager_->getBlock(input_block_id_, *input_relation_));
+      storage_manager_->getBlock(input_block_id_, input_relation_));
   block->selectSimple(select_attribute_id_,
                       nullptr,
                       output_destination_);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/UnionAllOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UnionAllOperator.hpp b/relational_operators/UnionAllOperator.hpp
index 3bfed82..4fc2906 100644
--- a/relational_operators/UnionAllOperator.hpp
+++ b/relational_operators/UnionAllOperator.hpp
@@ -45,6 +45,8 @@ class StorageManager;
 class WorkOrderProtosContainer;
 class WorkOrdersContainer;
 
+namespace serialization { class WorkOrder; }
+
 /** \addtogroup RelationalOperators
  *  @{
  */
@@ -149,6 +151,11 @@ class UnionAllOperator : public RelationalOperator {
                                    InsertDestination *output_destination,
                                    const std::size_t relation_index);
 
+  // Create work order proto
+  serialization::WorkOrder* createWorkOrderProto(const block_id block,
+                                                 const relation_id relation,
+                                                 const std::vector<attribute_id> &attributes);
+
   const std::vector<const CatalogRelation*> input_relations_;
   const std::vector<bool> input_relations_are_stored_;
 
@@ -194,7 +201,7 @@ class UnionAllWorkOrder : public WorkOrder {
    * @param storage_manager The StorageManager to use.
    */
   UnionAllWorkOrder(const std::size_t query_id,
-                    const CatalogRelationSchema *input_relation,
+                    const CatalogRelationSchema &input_relation,
                     const block_id input_block_id,
                     const std::vector<attribute_id> &select_attribute_id,
                     InsertDestination *output_destination,
@@ -211,7 +218,7 @@ class UnionAllWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  const CatalogRelationSchema *input_relation_;
+  const CatalogRelationSchema &input_relation_;
   const block_id input_block_id_;
   const std::vector<attribute_id> select_attribute_id_;
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index d0d0753..12a65ca 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -45,6 +45,7 @@ enum WorkOrderType {
   WINDOW_AGGREGATION = 21;
   DESTROY_AGGREGATION_STATE = 22;
   BUILD_AGGREGATION_EXISTENCE_MAP = 23;
+  UNION_ALL = 24;
 }
 
 message WorkOrder {
@@ -291,3 +292,12 @@ message BuildAggregationExistenceMapWorkOrder {
     optional uint32 aggr_state_index = 371;
   }
 }
+
+message UnionAllWorkOrder {
+  extend WorkOrder {
+    optional int32 relation_id = 384;
+    optional int32 insert_destination_index = 385;
+    optional fixed64 block_id = 386;
+    repeated int32 select_attribute_id = 387;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 56f431b..d63bb62 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -49,6 +49,7 @@
 #include "relational_operators/SortRunGenerationOperator.hpp"
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
+#include "relational_operators/UnionAllOperator.hpp"
 #include "relational_operators/UpdateOperator.hpp"
 #include "relational_operators/WindowAggregationOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
@@ -492,6 +493,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
               proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
           hdfs);
     }
+    case serialization::UNION_ALL: {
+      LOG(INFO) << "Creating UnionAllWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+      vector<attribute_id> select_attribute_id;
+      for (int i = 0; i < proto.ExtensionSize(serialization::UnionAllWorkOrder::select_attribute_id); ++i) {
+        select_attribute_id.push_back(
+            proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i));
+      }
+      return new UnionAllWorkOrder(
+          proto.query_id(),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::UnionAllWorkOrder::relation_id)),
+          proto.GetExtension(serialization::UnionAllWorkOrder::block_id),
+          select_attribute_id,
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::UnionAllWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
       return new UpdateWorkOrder(
@@ -892,6 +910,28 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
              query_context.isValidInsertDestinationId(
                  proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index));
     }
+    case serialization::UNION_ALL: {
+      if (!proto.HasExtension(serialization::UnionAllWorkOrder::relation_id) ||
+          !proto.HasExtension(serialization::UnionAllWorkOrder::block_id) ||
+          !proto.HasExtension(serialization::UnionAllWorkOrder::insert_destination_index) ||
+          !query_context.isValidInsertDestinationId(
+              proto.GetExtension(serialization::UnionAllWorkOrder::insert_destination_index))) {
+        return false;
+      }
+
+      const relation_id rel_id = proto.GetExtension(serialization::UnionAllWorkOrder::relation_id);
+      if (!catalog_database.hasRelationWithId(rel_id)) {
+        return false;
+      }
+      const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id);
+      for (int i = 0; i < proto.ExtensionSize(serialization::UnionAllWorkOrder::select_attribute_id); ++i) {
+        if (!relation.hasAttributeWithId(
+                 proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i))) {
+          return false;
+        }
+      }
+      return true;
+    }
     case serialization::UPDATE: {
       return proto.HasExtension(serialization::UpdateWorkOrder::relation_id) &&
              catalog_database.hasRelationWithId(