You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2017/05/04 03:22:47 UTC
[03/32] incubator-quickstep git commit: Add protobuf support for
union all operator.
Add protobuf support for union all operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/758f07a1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/758f07a1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/758f07a1
Branch: refs/heads/new-op
Commit: 758f07a1613e68823fab9323461fdcae56714365
Parents: 3c83c93
Author: Tianrun <Ti...@node-0.tianrun-qv23700.quickstep-pg0.wisc.cloudlab.us>
Authored: Mon Apr 17 11:03:13 2017 -0600
Committer: Tianrun <Ti...@node-2.tianrun-qv24978.quickstep-pg0.wisc.cloudlab.us>
Committed: Thu Apr 20 14:41:28 2017 -0600
----------------------------------------------------------------------
relational_operators/CMakeLists.txt | 1 +
relational_operators/UnionAllOperator.cpp | 58 ++++++++++++++++++++++----
relational_operators/UnionAllOperator.hpp | 11 ++++-
relational_operators/WorkOrder.proto | 10 +++++
relational_operators/WorkOrderFactory.cpp | 40 ++++++++++++++++++
5 files changed, 110 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 4ea809b..39538ea 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -586,6 +586,7 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_SortRunGenerationOperator
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
+ quickstep_relationaloperators_UnionAllOperator
quickstep_relationaloperators_UpdateOperator
quickstep_relationaloperators_WindowAggregationOperator
quickstep_relationaloperators_WorkOrder_proto
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/UnionAllOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UnionAllOperator.cpp b/relational_operators/UnionAllOperator.cpp
index 141b3cf..1d82aef 100644
--- a/relational_operators/UnionAllOperator.cpp
+++ b/relational_operators/UnionAllOperator.cpp
@@ -65,7 +65,7 @@ void UnionAllOperator::addWorkOrdersSingleRelation(
container->addNormalWorkOrder(
new UnionAllWorkOrder(
query_id_,
- input_relations_[relation_index],
+ *input_relations_[relation_index],
input_block_id,
select_attribute_ids_[relation_index],
output_destination,
@@ -75,11 +75,11 @@ void UnionAllOperator::addWorkOrdersSingleRelation(
} else {
std::size_t num_generated = num_workorders_generated_[relation_index];
const std::vector<block_id> &all_blocks = input_relations_block_ids_[relation_index];
- while (num_generated < all_blocks .size()) {
+ while (num_generated < all_blocks.size()) {
container->addNormalWorkOrder(
new UnionAllWorkOrder(
query_id_,
- input_relations_[relation_index],
+ *input_relations_[relation_index],
all_blocks[num_generated],
select_attribute_ids_[relation_index],
output_destination,
@@ -124,18 +124,60 @@ bool UnionAllOperator::getAllWorkOrders(
relation_index);
}
}
- return stored_generated_ && done_feeding_input_relation_;
+ return done_feeding_input_relation_;
}
bool UnionAllOperator::getAllWorkOrderProtos(WorkOrderProtosContainer* container) {
- // TODO(tianrun): Add protobuf for UnionAllWorkOrder to support distributed mode.
- LOG(FATAL) << "UnionAllOperator is not supported in distributed mode yet.";
- return true;
+ if (!stored_generated_) {
+ for (std::size_t relation_index = 0; relation_index < input_relations_.size(); ++relation_index) {
+ if (input_relations_are_stored_[relation_index]) {
+ const std::vector<block_id> &all_blocks = input_relations_block_ids_[relation_index];
+ const relation_id relation = input_relations_[relation_index]->getID();
+ const std::vector<attribute_id> &attributes = select_attribute_ids_[relation_index];
+ for (const block_id block : all_blocks) {
+ container->addWorkOrderProto(createWorkOrderProto(block, relation, attributes), op_index_);
+ }
+ }
+ }
+ stored_generated_ = true;
+ }
+
+ for (std::size_t relation_index = 0; relation_index < input_relations_.size(); ++relation_index) {
+ if (!input_relations_are_stored_[relation_index]) {
+ const std::vector<block_id> &all_blocks = input_relations_block_ids_[relation_index];
+ std::size_t num_generated = num_workorders_generated_[relation_index];
+ const relation_id relation = input_relations_[relation_index]->getID();
+ const std::vector<attribute_id> &attributes = select_attribute_ids_[relation_index];
+ while (num_generated < all_blocks.size()) {
+ container->addWorkOrderProto(createWorkOrderProto(all_blocks[num_generated], relation, attributes), op_index_);
+ ++num_generated;
+ }
+ num_workorders_generated_[relation_index] = num_generated;
+ }
+ }
+ return done_feeding_input_relation_;
+}
+
+serialization::WorkOrder* UnionAllOperator::createWorkOrderProto(
+ const block_id block,
+ const relation_id relation,
+ const std::vector<attribute_id> &attributes) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::UNION_ALL);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::UnionAllWorkOrder::relation_id, relation);
+ proto->SetExtension(serialization::UnionAllWorkOrder::insert_destination_index, output_destination_index_);
+ proto->SetExtension(serialization::UnionAllWorkOrder::block_id, block);
+ for (const attribute_id attr : attributes) {
+ proto->AddExtension(serialization::UnionAllWorkOrder::select_attribute_id, attr);
+ }
+ return proto;
}
void UnionAllWorkOrder::execute() {
BlockReference block(
- storage_manager_->getBlock(input_block_id_, *input_relation_));
+ storage_manager_->getBlock(input_block_id_, input_relation_));
block->selectSimple(select_attribute_id_,
nullptr,
output_destination_);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/UnionAllOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UnionAllOperator.hpp b/relational_operators/UnionAllOperator.hpp
index 3bfed82..4fc2906 100644
--- a/relational_operators/UnionAllOperator.hpp
+++ b/relational_operators/UnionAllOperator.hpp
@@ -45,6 +45,8 @@ class StorageManager;
class WorkOrderProtosContainer;
class WorkOrdersContainer;
+namespace serialization { class WorkOrder; }
+
/** \addtogroup RelationalOperators
* @{
*/
@@ -149,6 +151,11 @@ class UnionAllOperator : public RelationalOperator {
InsertDestination *output_destination,
const std::size_t relation_index);
+ // Create work order proto
+ serialization::WorkOrder* createWorkOrderProto(const block_id block,
+ const relation_id relation,
+ const std::vector<attribute_id> &attributes);
+
const std::vector<const CatalogRelation*> input_relations_;
const std::vector<bool> input_relations_are_stored_;
@@ -194,7 +201,7 @@ class UnionAllWorkOrder : public WorkOrder {
* @param storage_manager The StorageManager to use.
*/
UnionAllWorkOrder(const std::size_t query_id,
- const CatalogRelationSchema *input_relation,
+ const CatalogRelationSchema &input_relation,
const block_id input_block_id,
const std::vector<attribute_id> &select_attribute_id,
InsertDestination *output_destination,
@@ -211,7 +218,7 @@ class UnionAllWorkOrder : public WorkOrder {
void execute() override;
private:
- const CatalogRelationSchema *input_relation_;
+ const CatalogRelationSchema &input_relation_;
const block_id input_block_id_;
const std::vector<attribute_id> select_attribute_id_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index d0d0753..12a65ca 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -45,6 +45,7 @@ enum WorkOrderType {
WINDOW_AGGREGATION = 21;
DESTROY_AGGREGATION_STATE = 22;
BUILD_AGGREGATION_EXISTENCE_MAP = 23;
+ UNION_ALL = 24;
}
message WorkOrder {
@@ -291,3 +292,12 @@ message BuildAggregationExistenceMapWorkOrder {
optional uint32 aggr_state_index = 371;
}
}
+
+message UnionAllWorkOrder {
+ extend WorkOrder {
+ optional int32 relation_id = 384;
+ optional int32 insert_destination_index = 385;
+ optional fixed64 block_id = 386;
+ repeated int32 select_attribute_id = 387;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/758f07a1/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 56f431b..d63bb62 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -49,6 +49,7 @@
#include "relational_operators/SortRunGenerationOperator.hpp"
#include "relational_operators/TableGeneratorOperator.hpp"
#include "relational_operators/TextScanOperator.hpp"
+#include "relational_operators/UnionAllOperator.hpp"
#include "relational_operators/UpdateOperator.hpp"
#include "relational_operators/WindowAggregationOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
@@ -492,6 +493,23 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
hdfs);
}
+ case serialization::UNION_ALL: {
+ LOG(INFO) << "Creating UnionAllWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
+ vector<attribute_id> select_attribute_id;
+ for (int i = 0; i < proto.ExtensionSize(serialization::UnionAllWorkOrder::select_attribute_id); ++i) {
+ select_attribute_id.push_back(
+ proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i));
+ }
+ return new UnionAllWorkOrder(
+ proto.query_id(),
+ catalog_database->getRelationSchemaById(
+ proto.GetExtension(serialization::UnionAllWorkOrder::relation_id)),
+ proto.GetExtension(serialization::UnionAllWorkOrder::block_id),
+ select_attribute_id,
+ query_context->getInsertDestination(
+ proto.GetExtension(serialization::UnionAllWorkOrder::insert_destination_index)),
+ storage_manager);
+ }
case serialization::UPDATE: {
LOG(INFO) << "Creating UpdateWorkOrder for Query " << proto.query_id() << " in Shiftboss " << shiftboss_index;
return new UpdateWorkOrder(
@@ -892,6 +910,28 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
query_context.isValidInsertDestinationId(
proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index));
}
+ case serialization::UNION_ALL: {
+ if (!proto.HasExtension(serialization::UnionAllWorkOrder::relation_id) ||
+ !proto.HasExtension(serialization::UnionAllWorkOrder::block_id) ||
+ !proto.HasExtension(serialization::UnionAllWorkOrder::insert_destination_index) ||
+ !query_context.isValidInsertDestinationId(
+ proto.GetExtension(serialization::UnionAllWorkOrder::insert_destination_index))) {
+ return false;
+ }
+
+ const relation_id rel_id = proto.GetExtension(serialization::UnionAllWorkOrder::relation_id);
+ if (!catalog_database.hasRelationWithId(rel_id)) {
+ return false;
+ }
+ const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id);
+ for (int i = 0; i < proto.ExtensionSize(serialization::UnionAllWorkOrder::select_attribute_id); ++i) {
+ if (!relation.hasAttributeWithId(
+ proto.GetExtension(serialization::UnionAllWorkOrder::select_attribute_id, i))) {
+ return false;
+ }
+ }
+ return true;
+ }
case serialization::UPDATE: {
return proto.HasExtension(serialization::UpdateWorkOrder::relation_id) &&
catalog_database.hasRelationWithId(