You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ha...@apache.org on 2016/06/15 19:48:38 UTC
[35/50] [abbrv] incubator-quickstep git commit: QUICKSTEP-10:
Serialized WorkOrders as proto.
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortMergeRunOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 6bf5719..e398d62 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -23,9 +23,11 @@
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/SortMergeRunOperator.pb.h"
#include "relational_operators/SortMergeRunOperatorHelpers.hpp"
+#include "relational_operators/WorkOrder.pb.h"
#include "threading/ThreadIDBasedMap.hpp"
#include "glog/logging.h"
@@ -69,6 +71,72 @@ bool SortMergeRunOperator::getAllWorkOrders(
return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id, bus);
}
+bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (input_relation_is_stored_) {
+ // Input blocks (or runs) are from base relation. Only possible when base
+ // relation is stored sorted.
+ if (!started_) {
+ // Initialize merge tree completely, since all input runs are known.
+ merge_tree_.initializeTree(input_relation_block_ids_.size());
+ started_ = true;
+ initializeInputRuns();
+ }
+ } else {
+ // Input blocks (or runs) are pipelined from the sorted run generation
+ // operator.
+ if (!started_ && !input_stream_done_) {
+ // Initialize merge tree for first pipeline mode.
+ merge_tree_.initializeForPipeline();
+ started_ = true;
+ initializeInputRuns();
+ }
+ }
+
+ // Get merge jobs from merge tree.
+ std::vector<MergeTree::MergeJob> jobs;
+ const bool done_generating = merge_tree_.getMergeJobs(&jobs);
+
+ for (std::vector<MergeTree::MergeJob>::size_type job_id = 0;
+ job_id < jobs.size();
+ ++job_id) {
+ // Add work order for each merge job.
+ container->addWorkOrderProto(createWorkOrderProto(&jobs[job_id]), op_index_);
+ }
+
+ return done_generating;
+}
+
+serialization::WorkOrder* SortMergeRunOperator::createWorkOrderProto(
+ merge_run_operator::MergeTree::MergeJob *job) {
+ DCHECK(job != nullptr);
+ DCHECK(!job->runs.empty());
+
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::SORT_MERGE_RUN);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::SortMergeRunWorkOrder::operator_index, op_index_);
+ proto->SetExtension(serialization::SortMergeRunWorkOrder::sort_config_index, sort_config_index_);
+
+ for (const merge_run_operator::Run &run : job->runs) {
+ serialization::Run *run_proto = proto->AddExtension(serialization::SortMergeRunWorkOrder::runs);
+ for (const block_id block : run) {
+ run_proto->add_blocks(block);
+ }
+ }
+
+ proto->SetExtension(serialization::SortMergeRunWorkOrder::top_k, top_k_);
+ proto->SetExtension(serialization::SortMergeRunWorkOrder::merge_level, job->level);
+ proto->SetExtension(serialization::SortMergeRunWorkOrder::relation_id,
+ job->level > 0 ? run_relation_.getID()
+ : input_relation_.getID());
+ proto->SetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index,
+ job->is_final_level ? output_destination_index_
+ : run_block_destination_index_);
+
+ return proto;
+}
+
WorkOrder *SortMergeRunOperator::createWorkOrder(
merge_run_operator::MergeTree::MergeJob *job,
QueryContext *query_context,
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortMergeRunOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index cfff8b9..177836f 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -44,8 +44,11 @@ namespace quickstep {
class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
+class WorkOrderProtosContainer;
class WorkOrdersContainer;
+namespace serialization { class WorkOrder; }
+
/**
* @defgroup SortMergeRun Merging Sorted Runs
* @ingroup Sort
@@ -132,6 +135,8 @@ class SortMergeRunOperator : public RelationalOperator {
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) override;
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
void feedInputBlock(const block_id input_block_id,
const relation_id input_relation_id) override {
input_relation_block_ids_.push_back(input_block_id);
@@ -182,6 +187,13 @@ class SortMergeRunOperator : public RelationalOperator {
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus);
+ /**
+ * @brief Create Work Order proto.
+ *
+ * @param job The merge job.
+ **/
+ serialization::WorkOrder* createWorkOrderProto(merge_run_operator::MergeTree::MergeJob *job);
+
const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortRunGenerationOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.cpp b/relational_operators/SortRunGenerationOperator.cpp
index 37b8fb8..d7362db 100644
--- a/relational_operators/SortRunGenerationOperator.cpp
+++ b/relational_operators/SortRunGenerationOperator.cpp
@@ -21,7 +21,9 @@
#include "catalog/CatalogRelation.hpp"
#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageManager.hpp"
@@ -80,6 +82,43 @@ bool SortRunGenerationOperator::getAllWorkOrders(
}
}
+bool SortRunGenerationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (input_relation_is_stored_) {
+ // Input blocks are from a base relation.
+ if (!started_) {
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+ }
+ started_ = true;
+ }
+ return true;
+ } else {
+ // Input blocks are pipelined.
+ while (num_workorders_generated_ < input_relation_block_ids_.size()) {
+ container->addWorkOrderProto(
+ createWorkOrderProto(input_relation_block_ids_[num_workorders_generated_]),
+ op_index_);
+ ++num_workorders_generated_;
+ }
+ return done_feeding_input_relation_;
+ }
+}
+
+serialization::WorkOrder* SortRunGenerationOperator::createWorkOrderProto(const block_id block) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::SORT_RUN_GENERATION);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index, sort_config_index_);
+ proto->SetExtension(serialization::SortRunGenerationWorkOrder::relation_id, input_relation_.getID());
+ proto->SetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index,
+ output_destination_index_);
+ proto->SetExtension(serialization::SortRunGenerationWorkOrder::block_id, block);
+
+ return proto;
+}
+
+
void SortRunGenerationWorkOrder::execute() {
BlockReference block(
storage_manager_->getBlock(input_block_id_, input_relation_));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/SortRunGenerationOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/SortRunGenerationOperator.hpp b/relational_operators/SortRunGenerationOperator.hpp
index f96e6a6..96a3ce1 100644
--- a/relational_operators/SortRunGenerationOperator.hpp
+++ b/relational_operators/SortRunGenerationOperator.hpp
@@ -40,8 +40,11 @@ namespace quickstep {
class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
+class WorkOrderProtosContainer;
class WorkOrdersContainer;
+namespace serialization { class WorkOrder; }
+
/**
* \defgroup Sort Sorting
* \ingroup RelationalOperators
@@ -112,6 +115,8 @@ class SortRunGenerationOperator : public RelationalOperator {
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) override;
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
DCHECK(input_relation_id == input_relation_.getID());
input_relation_block_ids_.push_back(input_block_id);
@@ -133,6 +138,13 @@ class SortRunGenerationOperator : public RelationalOperator {
}
private:
+ /**
+ * @brief Create Work Order proto.
+ *
+ * @param block The block id used in the Work Order.
+ **/
+ serialization::WorkOrder* createWorkOrderProto(const block_id block);
+
const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TableGeneratorOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index a3f9340..d5a08ec 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -1,6 +1,6 @@
/**
* Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
+ * University of Wisconsin\u2014Madison.
* Copyright 2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -20,7 +20,9 @@
#include "expressions/table_generator/GeneratorFunctionHandle.hpp"
#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
#include "storage/InsertDestination.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
@@ -53,6 +55,22 @@ bool TableGeneratorOperator::getAllWorkOrders(
return started_;
}
+bool TableGeneratorOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (!started_) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::TABLE_GENERATOR);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_);
+ proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_);
+
+ container->addWorkOrderProto(proto, op_index_);
+ started_ = true;
+ }
+ return true;
+}
+
+
void TableGeneratorWorkOrder::execute() {
ColumnVectorsValueAccessor temp_result;
function_handle_.populateColumns(&temp_result);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TableGeneratorOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TableGeneratorOperator.hpp b/relational_operators/TableGeneratorOperator.hpp
index 6a6af4b..1b791a6 100644
--- a/relational_operators/TableGeneratorOperator.hpp
+++ b/relational_operators/TableGeneratorOperator.hpp
@@ -1,6 +1,6 @@
/**
* Copyright 2016, Quickstep Research Group, Computer Sciences Department,
- * University of Wisconsin\u2014Madison.
+ * University of Wisconsin\u2014Madison.
* Copyright 2016 Pivotal Software, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -40,6 +40,7 @@ namespace quickstep {
class GeneratorFunctionHandle;
class InsertDestination;
class StorageManager;
+class WorkOrderProtosContainer;
class WorkOrdersContainer;
/** \addtogroup RelationalOperators
@@ -81,6 +82,8 @@ class TableGeneratorOperator : public RelationalOperator {
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) override;
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) override {
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index d2fd0cd..49c9150 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -22,6 +22,7 @@
#include <algorithm>
#include <cctype>
#include <cstddef>
+#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <memory>
@@ -31,21 +32,46 @@
#include "catalog/CatalogAttribute.hpp"
#include "query_execution/QueryContext.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
#include "storage/InsertDestination.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
-#include "types/containers/Tuple.hpp"
#include "types/containers/ColumnVector.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "types/containers/Tuple.hpp"
#include "utility/Glob.hpp"
+#include "gflags/gflags.h"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
+using std::size_t;
+using std::string;
+
namespace quickstep {
+// Text segment size set to 256KB.
+DEFINE_uint64(textscan_text_segment_size, 0x40000,
+ "Size of text segment in bytes the input text files "
+ "are split into in the TextScanOperator.");
+
+// Check if the segment size is positive.
+static bool ValidateTextScanTextSegmentSize(const char *flagname,
+ std::uint64_t text_segment_size) {
+ if (text_segment_size == 0) {
+ LOG(ERROR) << "--" << flagname << " must be greater than 0";
+ return false;
+ }
+
+ return true;
+}
+
+static const volatile bool text_scan_text_segment_size_dummy = gflags::RegisterFlagValidator(
+ &FLAGS_textscan_text_segment_size, &ValidateTextScanTextSegmentSize);
+
bool TextScanOperator::getAllWorkOrders(
WorkOrdersContainer *container,
QueryContext *query_context,
@@ -56,16 +82,12 @@ bool TextScanOperator::getAllWorkOrders(
const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
- if (files.size() == 0) {
- LOG(FATAL) << "No files matched '" << file_pattern_ << "'. Exiting.";
- }
+ CHECK_NE(files.size(), 0u)
+ << "No files matched '" << file_pattern_ << "'. Exiting.";
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
- // Text segment size set to 256KB.
- constexpr std::size_t kTextSegmentSize = 0x40000u;
-
if (blocking_dependencies_met_ && !work_generated_) {
for (const std::string &file : files) {
// Use standard C libary to retrieve the file size.
@@ -75,18 +97,32 @@ bool TextScanOperator::getAllWorkOrders(
std::fclose(fp);
std::size_t text_offset = 0;
- while (text_offset < file_size) {
+ for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+ num_full_segments > 0;
+ --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+ container->addNormalWorkOrder(
+ new TextScanWorkOrder(query_id_,
+ file,
+ text_offset,
+ FLAGS_textscan_text_segment_size,
+ field_terminator_,
+ process_escape_sequences_,
+ output_destination),
+ op_index_);
+ }
+
+ // Deal with the residual partial segment whose size is less than
+ // 'FLAGS_textscan_text_segment_size'.
+ if (text_offset < file_size) {
container->addNormalWorkOrder(
new TextScanWorkOrder(query_id_,
file,
text_offset,
- std::min(kTextSegmentSize, file_size - text_offset),
+ file_size - text_offset,
field_terminator_,
process_escape_sequences_,
- output_destination,
- storage_manager),
+ output_destination),
op_index_);
- text_offset += kTextSegmentSize;
}
}
work_generated_ = true;
@@ -94,24 +130,53 @@ bool TextScanOperator::getAllWorkOrders(
return work_generated_;
}
-TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
- const std::string &filename,
- const std::size_t text_offset,
- const std::size_t text_segment_size,
- const char field_terminator,
- const bool process_escape_sequences,
- InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
- filename_(filename),
- text_offset_(text_offset),
- text_segment_size_(text_segment_size),
- field_terminator_(field_terminator),
- process_escape_sequences_(process_escape_sequences),
- output_destination_(output_destination),
- storage_manager_(storage_manager) {
- DCHECK(output_destination_ != nullptr);
- DCHECK(storage_manager_ != nullptr);
+bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
+ if (blocking_dependencies_met_ && !work_generated_) {
+ for (const string &file : files) {
+ // Use standard C libary to retrieve the file size.
+ FILE *fp = std::fopen(file.c_str(), "rb");
+ std::fseek(fp, 0, SEEK_END);
+ const std::size_t file_size = std::ftell(fp);
+ std::fclose(fp);
+
+ size_t text_offset = 0;
+ for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+ num_full_segments > 0;
+ --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+ container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
+ op_index_);
+ }
+
+ // Deal with the residual partial segment whose size is less than
+ // 'FLAGS_textscan_text_segment_size'.
+ if (text_offset < file_size) {
+ container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset),
+ op_index_);
+ }
+ }
+ work_generated_ = true;
+ }
+ return work_generated_;
+}
+
+serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &filename,
+ const size_t text_offset,
+ const size_t text_segment_size) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::TEXT_SCAN);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::TextScanWorkOrder::filename, filename);
+ proto->SetExtension(serialization::TextScanWorkOrder::text_offset, text_offset);
+ proto->SetExtension(serialization::TextScanWorkOrder::text_segment_size, text_segment_size);
+ proto->SetExtension(serialization::TextScanWorkOrder::field_terminator, field_terminator_);
+ proto->SetExtension(serialization::TextScanWorkOrder::process_escape_sequences,
+ process_escape_sequences_);
+ proto->SetExtension(serialization::TextScanWorkOrder::insert_destination_index,
+ output_destination_index_);
+
+ return proto;
}
void TextScanWorkOrder::execute() {
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index d73e7dd..1a62ded 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -33,6 +33,8 @@
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"
+#include "glog/logging.h"
+
#include "tmb/id_typedefs.h"
namespace tmb { class MessageBus; }
@@ -42,8 +44,11 @@ namespace quickstep {
class CatalogRelationSchema;
class InsertDestination;
class StorageManager;
+class WorkOrderProtosContainer;
class WorkOrdersContainer;
+namespace serialization { class WorkOrder; }
+
/** \addtogroup RelationalOperators
* @{
*/
@@ -135,6 +140,8 @@ class TextScanOperator : public RelationalOperator {
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) override;
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
QueryContext::insert_destination_id getInsertDestinationID() const override {
return output_destination_index_;
}
@@ -144,6 +151,10 @@ class TextScanOperator : public RelationalOperator {
}
private:
+ serialization::WorkOrder* createWorkOrderProto(const std::string &filename,
+ const std::size_t text_offset,
+ const std::size_t text_segment_size);
+
const std::string file_pattern_;
const char field_terminator_;
const bool process_escape_sequences_;
@@ -173,7 +184,6 @@ class TextScanWorkOrder : public WorkOrder {
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
* @param output_destination The InsertDestination to insert tuples.
- * @param storage_manager The StorageManager to use.
**/
TextScanWorkOrder(
const std::size_t query_id,
@@ -182,8 +192,14 @@ class TextScanWorkOrder : public WorkOrder {
const std::size_t text_segment_size,
const char field_terminator,
const bool process_escape_sequences,
- InsertDestination *output_destination,
- StorageManager *storage_manager);
+ InsertDestination *output_destination)
+ : WorkOrder(query_id),
+ filename_(filename),
+ text_offset_(text_offset),
+ text_segment_size_(text_segment_size),
+ field_terminator_(field_terminator),
+ process_escape_sequences_(process_escape_sequences),
+ output_destination_(DCHECK_NOTNULL(output_destination)) {}
~TextScanWorkOrder() override {}
@@ -233,7 +249,6 @@ class TextScanWorkOrder : public WorkOrder {
Tuple parseRow(const char **row_ptr,
const CatalogRelationSchema &relation) const;
-
/**
* @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
* a char literal. \p *literal_ptr will be modified to the last position
@@ -297,7 +312,6 @@ class TextScanWorkOrder : public WorkOrder {
const bool process_escape_sequences_;
InsertDestination *output_destination_;
- StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/TextScanOperator.proto
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.proto b/relational_operators/TextScanOperator.proto
deleted file mode 100644
index 8ead3f3..0000000
--- a/relational_operators/TextScanOperator.proto
+++ /dev/null
@@ -1,22 +0,0 @@
-// Copyright 2015 Pivotal Software, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-syntax = "proto2";
-
-package quickstep.serialization;
-
-message TextBlob {
- required fixed64 blob_id = 1;
- required uint64 size = 2;
-}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/UpdateOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 2130563..1b2979e 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -26,7 +26,9 @@
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionMessages.pb.h"
#include "query_execution/QueryExecutionUtil.hpp"
+#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
+#include "relational_operators/WorkOrder.pb.h"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
@@ -72,6 +74,27 @@ bool UpdateOperator::getAllWorkOrders(
return started_;
}
+bool UpdateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
+ if (blocking_dependencies_met_ && !started_) {
+ for (const block_id input_block_id : input_blocks_) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::UPDATE);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+ proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+ started_ = true;
+ }
+ return started_;
+}
+
void UpdateWorkOrder::execute() {
MutableBlockReference block(
storage_manager_->getBlockMutable(input_block_id_, relation_));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/UpdateOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index cebb9b5..f6c5053 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -45,6 +45,7 @@ class InsertDestination;
class Predicate;
class Scalar;
class StorageManager;
+class WorkOrderProtosContainer;
class WorkOrdersContainer;
/** \addtogroup RelationalOperators
@@ -99,6 +100,8 @@ class UpdateOperator : public RelationalOperator {
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) override;
+ bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
+
QueryContext::insert_destination_id getInsertDestinationID() const override {
return relocation_destination_index_;
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 60d4c8f..3ed065a 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -232,23 +232,14 @@ message TextScanWorkOrder {
}
}
-message TextSplitWorkOrder {
- extend WorkOrder {
- // All required.
- optional uint64 operator_index = 320;
- optional string filename = 321;
- optional bool process_escape_sequences = 322;
- }
-}
-
message UpdateWorkOrder {
extend WorkOrder {
// All required.
- optional uint64 operator_index = 336;
- optional int32 relation_id = 337;
- optional int32 insert_destination_index = 338;
- optional int32 predicate_index = 339;
- optional uint32 update_group_index = 340;
- optional fixed64 block_id = 341;
+ optional uint64 operator_index = 320;
+ optional int32 relation_id = 321;
+ optional int32 insert_destination_index = 322;
+ optional int32 predicate_index = 323;
+ optional uint32 update_group_index = 324;
+ optional fixed64 block_id = 325;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/c9214ecb/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index da42b4d..e078b84 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -396,8 +396,7 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
query_context->getInsertDestination(
- proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
- storage_manager);
+ proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)));
}
case serialization::UPDATE: {
LOG(INFO) << "Creating UpdateWorkOrder";
@@ -425,6 +424,10 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
const CatalogDatabaseLite &catalog_database,
const QueryContext &query_context) {
+ if (!proto.IsInitialized()) {
+ return false;
+ }
+
switch (proto.work_order_type()) {
case serialization::AGGREGATION: {
return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&