You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by na...@apache.org on 2016/06/09 20:11:44 UTC
[1/3] incubator-quickstep git commit: Fixed a potential segfault with
CompressedBlockBuilder.
Repository: incubator-quickstep
Updated Branches:
refs/heads/travis_sharedlibs ef6a4525b -> 18c9dc18c
Fixed a potential segfault with CompressedBlockBuilder.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/eebb4644
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/eebb4644
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/eebb4644
Branch: refs/heads/travis_sharedlibs
Commit: eebb4644f195fd82b28e77aafcf60344c33d6197
Parents: 096abe2
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Jun 9 00:43:16 2016 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 9 10:52:12 2016 -0700
----------------------------------------------------------------------
storage/CompressedBlockBuilder.cpp | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/eebb4644/storage/CompressedBlockBuilder.cpp
----------------------------------------------------------------------
diff --git a/storage/CompressedBlockBuilder.cpp b/storage/CompressedBlockBuilder.cpp
index 4a181eb..1ca0c07 100644
--- a/storage/CompressedBlockBuilder.cpp
+++ b/storage/CompressedBlockBuilder.cpp
@@ -321,6 +321,9 @@ void CompressedBlockBuilder::buildCompressedColumnStoreTupleStorageSubBlock(void
bool CompressedBlockBuilder::addTupleInternal(Tuple *candidate_tuple) {
DEBUG_ASSERT(candidate_tuple->size() == relation_.size());
+ // Ensure that the tuple is the owner of its values.
+ candidate_tuple->ensureLiteral();
+
// Modify dictionaries and maximum integers to reflect the new tuple's
// values. Keep track of what has changed in case a rollback is needed.
vector<CompressionDictionaryBuilder*> modified_dictionaries;
[2/3] incubator-quickstep git commit: Improved TextScanOperator.
Posted by na...@apache.org.
Improved TextScanOperator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4f8fdbe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4f8fdbe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4f8fdbe8
Branch: refs/heads/travis_sharedlibs
Commit: 4f8fdbe8451aed1ad1c07a8badb5be85bee1ff57
Parents: eebb464
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Jun 9 03:18:37 2016 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 9 10:52:40 2016 -0700
----------------------------------------------------------------------
query_optimizer/ExecutionGenerator.cpp | 1 -
relational_operators/CMakeLists.txt | 23 +-
relational_operators/TextScanOperator.cpp | 818 ++++++-------------
relational_operators/TextScanOperator.hpp | 286 +++----
relational_operators/WorkOrder.proto | 15 +-
relational_operators/WorkOrderFactory.cpp | 72 +-
.../tests/TextScanOperator_unittest.cpp | 1 -
relational_operators/tests/text_scan_input.txt | 8 +-
8 files changed, 384 insertions(+), 840 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 99c2a21..f9fd742 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -945,7 +945,6 @@ void ExecutionGenerator::convertCopyFrom(
physical_plan->file_name(),
physical_plan->column_delimiter(),
physical_plan->escape_strings(),
- FLAGS_parallelize_load,
*output_relation,
insert_destination_index));
insert_destination_proto->set_relational_op_index(scan_operator_index);
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index d2693eb..eb73c07 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -1,5 +1,7 @@
# Copyright 2011-2015 Quickstep Technologies LLC.
# Copyright 2015-2016 Pivotal Software, Inc.
+# Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+# University of Wisconsin\u2014Madison.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,9 +18,6 @@
QS_PROTOBUF_GENERATE_CPP(relationaloperators_SortMergeRunOperator_proto_srcs
relationaloperators_SortMergeRunOperator_proto_hdrs
SortMergeRunOperator.proto)
-QS_PROTOBUF_GENERATE_CPP(relationaloperators_TextScanOperator_proto_srcs
- relationaloperators_TextScanOperator_proto_hdrs
- TextScanOperator.proto)
QS_PROTOBUF_GENERATE_CPP(relationaloperators_WorkOrder_proto_srcs
relationaloperators_WorkOrder_proto_hdrs
WorkOrder.proto)
@@ -61,9 +60,6 @@ add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGener
SortRunGenerationOperator.hpp)
add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp)
add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp)
-add_library(quickstep_relationaloperators_TextScanOperator_proto
- ${relationaloperators_TextScanOperator_proto_srcs}
- ${relationaloperators_TextScanOperator_proto_hdrs})
add_library(quickstep_relationaloperators_UpdateOperator UpdateOperator.cpp UpdateOperator.hpp)
add_library(quickstep_relationaloperators_WorkOrder ../empty_src.cpp WorkOrder.hpp)
add_library(quickstep_relationaloperators_WorkOrderFactory WorkOrderFactory.cpp WorkOrderFactory.hpp)
@@ -360,27 +356,19 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
glog
quickstep_catalog_CatalogAttribute
quickstep_catalog_CatalogRelation
- quickstep_catalog_CatalogRelationSchema
quickstep_catalog_CatalogTypedefs
quickstep_queryexecution_QueryContext
- quickstep_queryexecution_QueryExecutionMessages_proto
- quickstep_queryexecution_QueryExecutionTypedefs
- quickstep_queryexecution_QueryExecutionUtil
quickstep_queryexecution_WorkOrdersContainer
quickstep_relationaloperators_RelationalOperator
- quickstep_relationaloperators_TextScanOperator_proto
quickstep_relationaloperators_WorkOrder
quickstep_storage_InsertDestination
- quickstep_storage_StorageBlob
- quickstep_storage_StorageBlockInfo
- quickstep_storage_StorageManager
- quickstep_threading_ThreadIDBasedMap
quickstep_types_Type
quickstep_types_TypedValue
+ quickstep_types_containers_ColumnVector
+ quickstep_types_containers_ColumnVectorsValueAccessor
quickstep_types_containers_Tuple
quickstep_utility_Glob
quickstep_utility_Macros
- quickstep_utility_ThreadSafeQueue
tmb)
target_link_libraries(quickstep_relationaloperators_UpdateOperator
glog
@@ -430,7 +418,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
quickstep_relationaloperators_SortRunGenerationOperator
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
- quickstep_relationaloperators_TextScanOperator_proto
quickstep_relationaloperators_UpdateOperator
quickstep_relationaloperators_WorkOrder_proto
quickstep_storage_StorageBlockInfo
@@ -438,7 +425,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
tmb)
target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
quickstep_relationaloperators_SortMergeRunOperator_proto
- quickstep_relationaloperators_TextScanOperator_proto
${PROTOBUF_LIBRARY})
# Module all-in-one library:
@@ -466,7 +452,6 @@ target_link_libraries(quickstep_relationaloperators
quickstep_relationaloperators_SortRunGenerationOperator
quickstep_relationaloperators_TableGeneratorOperator
quickstep_relationaloperators_TextScanOperator
- quickstep_relationaloperators_TextScanOperator_proto
quickstep_relationaloperators_UpdateOperator
quickstep_relationaloperators_WorkOrder
quickstep_relationaloperators_WorkOrderFactory
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 5acecbf..d2fd0cd 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -1,6 +1,8 @@
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,124 +22,30 @@
#include <algorithm>
#include <cctype>
#include <cstddef>
-#include <cstdint>
#include <cstdio>
#include <cstdlib>
-#include <cstring>
+#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "catalog/CatalogAttribute.hpp"
-#include "catalog/CatalogRelationSchema.hpp"
#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionUtil.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
-#include "relational_operators/TextScanOperator.pb.h"
#include "storage/InsertDestination.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "threading/ThreadIDBasedMap.hpp"
#include "types/Type.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/Tuple.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "utility/Glob.hpp"
-#include "gflags/gflags.h"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::isxdigit;
-using std::size_t;
-using std::sscanf;
-using std::string;
namespace quickstep {
-DEFINE_uint64(textscan_split_blob_size, 2,
- "Size of blobs in number of slots the input text files "
- "are split into in the TextScanOperator.");
-
-// Check if blob size is positive.
-static bool ValidateTextScanSplitBlobSize(const char *flagname,
- std::uint64_t blob_size) {
- if (blob_size == 0) {
- LOG(ERROR) << "--" << flagname << " must be greater than 0";
- return false;
- }
-
- return true;
-}
-
-static const volatile bool text_scan_split_blob_size_dummy = gflags::RegisterFlagValidator(
- &FLAGS_textscan_split_blob_size, &ValidateTextScanSplitBlobSize);
-
-namespace {
-
-// Detect whether '*search_string' contains a row-terminator (either line-feed
-// or carriage-return + line-feed) immediately before 'end_pos'. If
-// 'process_escape_sequences' is true, this function will also eliminate
-// false-positives from an escaped row-terminator. Returns the number of
-// characters in the row-terminator, or 0 if no terminator is detected.
-inline unsigned DetectRowTerminator(const char *search_string,
- std::size_t end_pos,
- const bool process_escape_sequences) {
- if (end_pos == 0) {
- // Empty string.
- return 0;
- }
-
- if (search_string[end_pos - 1] != '\n') {
- // String doesn't end in newline.
- return 0;
- }
-
- if (end_pos == 1) {
- // String is the single newline character.
- return 1;
- }
-
- const bool have_carriage_return = (search_string[end_pos - 2] == '\r');
- if (have_carriage_return && (end_pos == 2)) {
- // String is CR-LF and nothing else.
- return 2;
- }
-
- std::size_t backslashes = 0;
- // Count consecutive backslashes preceding the terminator. If there is an odd
- // number of backslashes, then the terminator is escaped and doesn't count as
- // a real terminator. If there is an even number of backslashes, then each
- // pair is an escaped backslash literal and the terminator still counts.
- if (process_escape_sequences) {
- end_pos = end_pos - 2 - have_carriage_return;
- while (end_pos != 0) {
- if (search_string[end_pos] == '\\') {
- ++backslashes;
- --end_pos;
- if ((end_pos == 0) && (search_string[0] == '\\')) {
- // Don't forget to count a backslash at the very beginning of a string.
- ++backslashes;
- }
- } else {
- break;
- }
- }
- }
-
- if (backslashes & 0x1) {
- return 0;
- } else {
- return 1 + have_carriage_return;
- }
-}
-
-} // namespace
-
bool TextScanOperator::getAllWorkOrders(
WorkOrdersContainer *container,
QueryContext *query_context,
@@ -155,116 +63,50 @@ bool TextScanOperator::getAllWorkOrders(
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
- if (parallelize_load_) {
- // Parallel implementation: Split work orders are generated for each file
- // being bulk-loaded. (More than one file can be loaded, because we support
- // glob() semantics in file name.) These work orders read the input file,
- // and split them in the blobs that can be parsed independently.
- if (blocking_dependencies_met_) {
- if (!work_generated_) {
- // First, generate text-split work orders.
- for (const auto &file : files) {
- container->addNormalWorkOrder(
- new TextSplitWorkOrder(query_id_,
- file,
- process_escape_sequences_,
- storage_manager,
- op_index_,
- scheduler_client_id,
- bus),
- op_index_);
- ++num_split_work_orders_;
- }
- work_generated_ = true;
- return false;
- } else {
- // Check if there are blobs to parse.
- while (!text_blob_queue_.empty()) {
- const TextBlob blob_work = text_blob_queue_.popOne();
- container->addNormalWorkOrder(
- new TextScanWorkOrder(query_id_,
- blob_work.blob_id,
- blob_work.size,
- field_terminator_,
- process_escape_sequences_,
- output_destination,
- storage_manager),
- op_index_);
- }
- // Done if all split work orders are completed, and no blobs are left to
- // process.
- return num_done_split_work_orders_.load(std::memory_order_acquire) == num_split_work_orders_ &&
- text_blob_queue_.empty();
- }
- }
- return false;
- } else {
- // Serial implementation.
- if (blocking_dependencies_met_ && !work_generated_) {
- for (const auto &file : files) {
+ // Text segment size set to 256KB.
+ constexpr std::size_t kTextSegmentSize = 0x40000u;
+
+ if (blocking_dependencies_met_ && !work_generated_) {
+ for (const std::string &file : files) {
+ // Use standard C libary to retrieve the file size.
+ FILE *fp = std::fopen(file.c_str(), "rb");
+ std::fseek(fp, 0, SEEK_END);
+ const std::size_t file_size = std::ftell(fp);
+ std::fclose(fp);
+
+ std::size_t text_offset = 0;
+ while (text_offset < file_size) {
container->addNormalWorkOrder(
new TextScanWorkOrder(query_id_,
file,
+ text_offset,
+ std::min(kTextSegmentSize, file_size - text_offset),
field_terminator_,
process_escape_sequences_,
output_destination,
storage_manager),
op_index_);
+ text_offset += kTextSegmentSize;
}
- work_generated_ = true;
}
- return work_generated_;
- }
-}
-
-void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
- switch (msg.type()) {
- case kSplitWorkOrderCompletionMessage: {
- num_done_split_work_orders_.fetch_add(1, std::memory_order_release);
- break;
- }
- case kNewTextBlobMessage: {
- serialization::TextBlob proto;
- CHECK(proto.ParseFromArray(msg.payload(), msg.payload_size()));
- text_blob_queue_.push(TextBlob(proto.blob_id(), proto.size()));
- break;
- }
- default:
- LOG(ERROR) << "Unknown feedback message type for TextScanOperator";
+ work_generated_ = true;
}
+ return work_generated_;
}
TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
const std::string &filename,
+ const std::size_t text_offset,
+ const std::size_t text_segment_size,
const char field_terminator,
const bool process_escape_sequences,
InsertDestination *output_destination,
StorageManager *storage_manager)
: WorkOrder(query_id),
- is_file_(true),
filename_(filename),
+ text_offset_(text_offset),
+ text_segment_size_(text_segment_size),
field_terminator_(field_terminator),
- text_blob_(0),
- text_size_(0),
- process_escape_sequences_(process_escape_sequences),
- output_destination_(output_destination),
- storage_manager_(storage_manager) {
- DCHECK(output_destination_ != nullptr);
- DCHECK(storage_manager_ != nullptr);
-}
-
-TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
- const block_id text_blob,
- const std::size_t text_size,
- const char field_terminator,
- const bool process_escape_sequences,
- InsertDestination *output_destination,
- StorageManager *storage_manager)
- : WorkOrder(query_id),
- is_file_(false),
- field_terminator_(field_terminator),
- text_blob_(text_blob),
- text_size_(text_size),
process_escape_sequences_(process_escape_sequences),
output_destination_(output_destination),
storage_manager_(storage_manager) {
@@ -274,439 +116,293 @@ TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
void TextScanWorkOrder::execute() {
const CatalogRelationSchema &relation = output_destination_->getRelation();
+ std::vector<Tuple> tuples;
- string current_row_string;
- if (is_file_) {
- FILE *file = std::fopen(filename_.c_str(), "r");
- if (file == nullptr) {
- throw TextScanReadError(filename_);
- }
+ constexpr std::size_t kSmallBufferSize = 0x4000;
+ char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
- bool have_row = false;
- do {
- current_row_string.clear();
- have_row = readRowFromFile(file, ¤t_row_string);
- if (have_row) {
- Tuple tuple = parseRow(current_row_string, relation);
- output_destination_->insertTupleInBatch(tuple);
- }
- } while (have_row);
-
- std::fclose(file);
- } else {
- BlobReference blob = storage_manager_->getBlob(text_blob_);
- const char *blob_pos = static_cast<const char*>(blob->getMemory());
- const char *blob_end = blob_pos + text_size_;
- bool have_row = false;
- do {
- current_row_string.clear();
- have_row = readRowFromBlob(&blob_pos, blob_end, ¤t_row_string);
- if (have_row) {
- Tuple tuple = parseRow(current_row_string, relation);
- output_destination_->insertTupleInBatch(tuple);
- }
- } while (have_row);
-
- // Drop the consumed blob produced by TextSplitWorkOrder.
- blob.release();
- storage_manager_->deleteBlockOrBlobFile(text_blob_);
+ // Read text segment into buffer.
+ FILE *file = std::fopen(filename_.c_str(), "rb");
+ std::fseek(file, text_offset_, SEEK_SET);
+ std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+ if (bytes_read != text_segment_size_) {
+ throw TextScanReadError(filename_);
}
-}
-char TextScanWorkOrder::ParseOctalLiteral(const std::string &row_string,
- std::size_t *start_pos) {
- const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 3);
-
- int value = 0;
- for (; *start_pos < stop_pos; ++*start_pos) {
- int char_value = row_string[*start_pos] - '0';
- if ((char_value >= 0) && (char_value < 8)) {
- value = value * 8 + char_value;
- } else {
- return value;
+ // Locate the first newline character.
+ const char *buffer_end = buffer + text_segment_size_;
+ const char *row_ptr = buffer;
+ if (text_offset_ != 0) {
+ while (row_ptr < buffer_end && *row_ptr != '\n') {
+ ++row_ptr;
}
+ } else {
+ --row_ptr;
}
- return value;
-}
-
-char TextScanWorkOrder::ParseHexLiteral(const std::string &row_string,
- std::size_t *start_pos) {
- const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 2);
+ if (row_ptr >= buffer_end) {
+ // This block does not even contain a newline character.
+ return;
+ }
- int value = 0;
- for (; *start_pos < stop_pos; ++*start_pos) {
- if (!std::isxdigit(row_string[*start_pos])) {
- break;
- }
+ // Locate the last newline character.
+ const char *end_ptr = buffer_end - 1;
+ while (end_ptr > row_ptr && *end_ptr != '\n') {
+ --end_ptr;
+ }
- int char_value;
- if (std::isdigit(row_string[*start_pos])) {
- char_value = row_string[*start_pos] - '0';
- } else if (std::islower(row_string[*start_pos])) {
- char_value = row_string[*start_pos] - 'a' + 10;
+ // Advance both row_ptr and end_ptr by 1.
+ ++row_ptr;
+ ++end_ptr;
+ // Now row_ptr is pointing to the first character RIGHT AFTER the FIRST newline
+ // character in this text segment, and end_ptr is pointing to the first character
+ // RIGHT AFTER the LAST newline character in this text segment.
+
+ // Process the tuples which are between the first newline character and the
+ // last newline character.
+ while (row_ptr < end_ptr) {
+ if (*row_ptr == '\r' || *row_ptr == '\n') {
+ // Skip empty lines.
+ ++row_ptr;
} else {
- char_value = row_string[*start_pos] - 'A' + 10;
+ tuples.emplace_back(parseRow(&row_ptr, relation));
}
-
- value = value * 16 + char_value;
}
- return value;
-}
+ // Process the tuple that is right after the last newline character.
+ // NOTE(jianqiao): dynamic_read_size is trying to balance between the cases
+ // that the last tuple is very small / very large.
+ std::size_t dynamic_read_size = 1024;
+ std::string row_string;
+ std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+ bool has_reached_end = false;
+ do {
+ bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+ std::size_t bytes_to_copy = bytes_read;
-bool TextScanWorkOrder::readRowFromFile(FILE *file, std::string *row_string) const {
- // Read up to 1023 chars + null-terminator at a time.
- static constexpr std::size_t kRowBufferSize = 1024;
- char row_buffer[kRowBufferSize];
- for (;;) {
- char *read_string = std::fgets(row_buffer, sizeof(row_buffer), file);
- if (read_string == nullptr) {
- if (std::feof(file)) {
- if (row_string->empty()) {
- return false;
- } else {
- throw TextScanFormatError("File ended without delimiter");
- }
- } else {
- throw TextScanReadError(filename_);
+ for (std::size_t i = 0; i < bytes_read; ++i) {
+ if (buffer[i] == '\n') {
+ bytes_to_copy = i + 1;
+ has_reached_end = true;
+ break;
}
}
-
- // Append the contents of the buffer to '*row_string', and see if we've
- // reached a genuine row-terminator yet.
- row_string->append(row_buffer);
- if (removeRowTerminator(row_string)) {
- row_string->push_back(field_terminator_);
- return true;
+ if (!has_reached_end && bytes_read != dynamic_read_size) {
+ has_reached_end = true;
}
- }
-}
-bool TextScanWorkOrder::readRowFromBlob(const char **start_pos,
- const char *end_pos,
- std::string *row_string) const {
- while (*start_pos != end_pos) {
- const char *next_newline = static_cast<const char*>(std::memchr(
- *start_pos,
- '\n',
- end_pos - *start_pos));
-
- if (next_newline == nullptr) {
- throw TextScanFormatError("File ended without delimiter");
- }
+ row_string.append(buffer, bytes_to_copy);
+ dynamic_read_size = std::min(dynamic_read_size * 2, kSmallBufferSize);
+ } while (!has_reached_end);
- // Append the blob's contents through the next newline to '*row_string',
- // and see if we've reached a genuine row-terminator yet.
- row_string->append(*start_pos, next_newline - *start_pos + 1);
- *start_pos = next_newline + 1;
- if (removeRowTerminator(row_string)) {
- row_string->push_back(field_terminator_);
- return true;
+ if (!row_string.empty()) {
+ if (row_string.back() != '\n') {
+ row_string.push_back('\n');
}
+ row_ptr = row_string.c_str();
+ tuples.emplace_back(parseRow(&row_ptr, relation));
}
- if (row_string->empty()) {
- return false;
- } else {
- throw TextScanFormatError("File ended without delimiter");
- }
-}
-
-bool TextScanWorkOrder::removeRowTerminator(std::string *row_string) const {
- unsigned row_term_chars = DetectRowTerminator(row_string->c_str(),
- row_string->length(),
- process_escape_sequences_);
- if (row_term_chars == 0) {
- return false;
- } else {
- row_string->resize(row_string->length() - row_term_chars);
- return true;
- }
-}
-
-bool TextScanWorkOrder::extractFieldString(const std::string &row_string,
- std::size_t *start_pos,
- std::string *field_string) const {
- // Check for NULL literal string.
- if (process_escape_sequences_
- && (row_string.length() - *start_pos >= 3)
- && (row_string[*start_pos] == '\\')
- && (row_string[*start_pos + 1] == 'N')
- && (row_string[*start_pos + 2] == field_terminator_)) {
- *start_pos += 3;
- return false;
- }
-
- // Scan up until terminator, expanding backslashed escape sequences as we go.
- std::size_t terminator_pos = row_string.find(field_terminator_, *start_pos);
- std::size_t scan_pos = *start_pos;
-
- if (process_escape_sequences_) {
- for (;;) {
- std::size_t backslash_pos = row_string.find('\\', scan_pos);
- if ((backslash_pos == std::string::npos) || (backslash_pos >= terminator_pos)) {
- // No more backslashes, or the next backslash is beyond the field
- // terminator.
- break;
- }
-
- // Copy up to the backslash.
- field_string->append(row_string, scan_pos, backslash_pos - scan_pos);
-
- if (backslash_pos + 1 == terminator_pos) {
- // The terminator we found was escaped by a backslash, so append the
- // literal terminator and re-scan for the next terminator character.
- field_string->push_back(field_terminator_);
- scan_pos = terminator_pos + 1;
- terminator_pos = row_string.find(field_terminator_, scan_pos);
- continue;
+ std::fclose(file);
+ free(buffer);
+
+ // Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
+ ColumnVectorsValueAccessor column_vectors;
+ std::size_t attr_id = 0;
+ for (const auto &attribute : relation) {
+ const Type &attr_type = attribute.getType();
+ if (attr_type.isVariableLength()) {
+ std::unique_ptr<IndirectColumnVector> column(
+ new IndirectColumnVector(attr_type, tuples.size()));
+ for (const auto &tuple : tuples) {
+ column->appendTypedValue(tuple.getAttributeValue(attr_id));
}
-
- // Expand escape sequence.
- switch (row_string[backslash_pos + 1]) {
- case '0': // Fallthrough for octal digits.
- case '1':
- case '2':
- case '3':
- case '4':
- case '5':
- case '6':
- case '7':
- // Octal char literal.
- scan_pos = backslash_pos + 1;
- field_string->push_back(ParseOctalLiteral(row_string, &scan_pos));
- break;
- case 'N': {
- // Null literal after some other column data.
- throw TextScanFormatError(
- "Null indicator '\\N' encountered in text scan mixed in with "
- "other column data.");
- }
- case '\\':
- // Backslash.
- field_string->push_back('\\');
- scan_pos = backslash_pos + 2;
- break;
- case 'b':
- // Backspace.
- field_string->push_back('\b');
- scan_pos = backslash_pos + 2;
- break;
- case 'f':
- // Form-feed.
- field_string->push_back('\f');
- scan_pos = backslash_pos + 2;
- break;
- case 'n':
- // Newline.
- field_string->push_back('\n');
- scan_pos = backslash_pos + 2;
- break;
- case 'r':
- // Carriage return.
- field_string->push_back('\r');
- scan_pos = backslash_pos + 2;
- break;
- case 't':
- // Tab.
- field_string->push_back('\t');
- scan_pos = backslash_pos + 2;
- break;
- case 'v':
- // Vertical tab.
- field_string->push_back('\v');
- scan_pos = backslash_pos + 2;
- break;
- case 'x':
- if ((backslash_pos + 2 < row_string.length()) && std::isxdigit(row_string[backslash_pos + 2])) {
- // Hexidecimal char literal.
- scan_pos = backslash_pos + 2;
- field_string->push_back(ParseHexLiteral(row_string, &scan_pos));
- } else {
- // Just an escaped 'x' with no hex digits.
- field_string->push_back('x');
- scan_pos = backslash_pos + 2;
- }
- break;
- default:
- // Append escaped character as-is.
- field_string->push_back(row_string[backslash_pos + 1]);
- scan_pos = backslash_pos + 2;
- break;
+ column_vectors.addColumn(column.release());
+ } else {
+ std::unique_ptr<NativeColumnVector> column(
+ new NativeColumnVector(attr_type, tuples.size()));
+ for (const auto &tuple : tuples) {
+ column->appendTypedValue(tuple.getAttributeValue(attr_id));
}
+ column_vectors.addColumn(column.release());
}
+ ++attr_id;
}
- DCHECK_NE(terminator_pos, std::string::npos);
- field_string->append(row_string, scan_pos, terminator_pos - scan_pos);
- *start_pos = terminator_pos + 1;
- return true;
+ // Bulk insert the tuples.
+ output_destination_->bulkInsertTuples(&column_vectors);
}
-Tuple TextScanWorkOrder::parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const {
+Tuple TextScanWorkOrder::parseRow(const char **row_ptr,
+ const CatalogRelationSchema &relation) const {
std::vector<TypedValue> attribute_values;
- std::size_t pos = 0;
+ bool is_null_literal;
+ bool has_reached_end_of_line = false;
std::string value_str;
- CatalogRelationSchema::const_iterator attr_it = relation.begin();
- while (pos < row_string.length()) {
- if (attr_it == relation.end()) {
- throw TextScanFormatError("Row has too many fields");
+ for (const auto &attr : relation) {
+ if (has_reached_end_of_line) {
+ throw TextScanFormatError("Row has too few fields");
}
value_str.clear();
- if (extractFieldString(row_string, &pos, &value_str)) {
- attribute_values.emplace_back();
- if (!attr_it->getType().parseValueFromString(value_str, &(attribute_values.back()))) {
- throw TextScanFormatError("Failed to parse value");
- }
- } else {
+ extractFieldString(row_ptr,
+ &is_null_literal,
+ &has_reached_end_of_line,
+ &value_str);
+
+ if (is_null_literal) {
// NULL literal.
- if (!attr_it->getType().isNullable()) {
+ if (!attr.getType().isNullable()) {
throw TextScanFormatError(
"NULL literal '\\N' was specified for a column with a "
"non-nullable Type");
}
-
- attribute_values.emplace_back(attr_it->getType().makeNullValue());
+ attribute_values.emplace_back(attr.getType().makeNullValue());
+ } else {
+ attribute_values.emplace_back();
+ if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) {
+ throw TextScanFormatError("Failed to parse value");
+ }
}
-
- ++attr_it;
}
- if (attr_it != relation.end()) {
- throw TextScanFormatError("Row has too few fields");
+ if (!has_reached_end_of_line) {
+ throw TextScanFormatError("Row has too many fields");
}
return Tuple(std::move(attribute_values));
}
-void TextSplitWorkOrder::execute() {
- std::FILE *file = std::fopen(filename_.c_str(), "r");
- if (!file) {
- throw TextScanReadError(filename_);
- }
-
- bool eof = false;
- do {
- // Allocate new blob, if current is empty.
- if (0 == remainingBlobBytes()) {
- allocateBlob();
- }
-
- // Read the into the unwritten part of blob.
- std::size_t bytes =
- std::fread(writeableBlobAddress(), 1, remainingBlobBytes(), file);
- eof = bytes < remainingBlobBytes();
- written_ += bytes;
-
- // Write the current blob to queue for processing.
- sendBlobInfoToOperator(!eof /* write_row_aligned */);
- } while (!eof);
-
- std::fclose(file);
+void TextScanWorkOrder::extractFieldString(const char **field_ptr,
+ bool *is_null_literal,
+ bool *has_reached_end_of_line,
+ std::string *field_string) const {
+ const char *cur_ptr = *field_ptr;
+ *is_null_literal = false;
- // Notify the operator about the completion of this Work Order.
- FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage,
- operator_index_,
- nullptr /* payload */,
- 0 /* payload_size */,
- false /* ownership */);
- SendFeedbackMessage(bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg);
-}
+ // Check for NULL literal string.
+ if (process_escape_sequences_ && cur_ptr[0] == '\\' && cur_ptr[1] == 'N') {
+ cur_ptr += 2;
-// Allocate new blob.
-void TextSplitWorkOrder::allocateBlob() {
- text_blob_id_ = storage_manager_->createBlob(FLAGS_textscan_split_blob_size);
- text_blob_ = storage_manager_->getBlobMutable(text_blob_id_);
- blob_size_ = text_blob_->size();
- written_ = 0;
-}
+ // Skip '\r'
+ if (*cur_ptr == '\r') {
+ ++cur_ptr;
+ }
-// Find the last row terminator in the blob.
-std::size_t TextSplitWorkOrder::findLastRowTerminator() {
- std::size_t found = 0;
- const char *blob = static_cast<const char *>(text_blob_->getMemory());
-
- for (std::size_t index = written_;
- index != 0;
- --index) {
- if (DetectRowTerminator(blob, index, process_escape_sequences_)) {
- found = index;
- break;
+ const char c = *cur_ptr;
+ if (c == field_terminator_ || c == '\n') {
+ *is_null_literal = true;
+ *has_reached_end_of_line = (c == '\n');
+ *field_ptr = cur_ptr + 1;
+ return;
}
}
- // TODO(quickstep-team): Design a way to handle long rows that are larger than
- // the configured blob size.
- CHECK_NE(0u, found) << "No row terminator found in " << FLAGS_textscan_split_blob_size
- << "-slot chunk of " << filename_;
- return found;
-}
+ // Not a NULL literal string, rewind cur_ptr to the start position for parsing.
+ cur_ptr = *field_ptr;
-void TextSplitWorkOrder::sendBlobInfoToOperator(const bool write_row_aligned) {
- std::size_t text_len = written_;
- std::string residue;
- if (write_row_aligned) {
- // Find last row terminator in current blob.
- text_len = findLastRowTerminator();
-
- // Copy the residual bytes after the last row terminator.
- residue = std::string(
- static_cast<char *>(text_blob_->getMemoryMutable()) + text_len,
- written_ - text_len);
- }
+ if (!process_escape_sequences_) {
+ // Simply copy until field_terminator or '\n'.
+ for (;; ++cur_ptr) {
+ const char c = *cur_ptr;
+ if (c == field_terminator_) {
+ *has_reached_end_of_line = false;
+ break;
+ } else if (c == '\n') {
+ *has_reached_end_of_line = true;
+ break;
+ }
- // Notify the operator for the split-up blob.
- serialization::TextBlob proto;
- proto.set_blob_id(text_blob_id_);
- proto.set_size(text_len);
-
- const std::size_t payload_size = proto.ByteSize();
- // NOTE(zuyu): 'payload' gets released by FeedbackMessage's destructor.
- char *payload = static_cast<char *>(std::malloc(payload_size));
- CHECK(proto.SerializeToArray(payload, payload_size));
-
- const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
- FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage,
- operator_index_,
- payload,
- payload_size);
- SendFeedbackMessage(bus_, worker_thread_client_id, scheduler_client_id_, feedback_msg);
-
- // Notify Foreman for the avaiable work order on the blob.
- serialization::WorkOrdersAvailableMessage message_proto;
- message_proto.set_operator_index(operator_index_);
-
- // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
- const size_t message_proto_length = message_proto.ByteSize();
- char *message_proto_bytes = static_cast<char*>(std::malloc(message_proto_length));
- CHECK(message_proto.SerializeToArray(message_proto_bytes, message_proto_length));
-
- tmb::TaggedMessage tagged_message(static_cast<const void *>(message_proto_bytes),
- message_proto_length,
- kWorkOrdersAvailableMessage);
- std::free(message_proto_bytes);
-
- // Send new work order available message to Foreman.
- const tmb::MessageBus::SendStatus send_status =
- QueryExecutionUtil::SendTMBMessage(
- bus_,
- worker_thread_client_id,
- scheduler_client_id_,
- std::move(tagged_message));
- CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
- "be sent from thread with TMB client ID "
- << worker_thread_client_id << " to Foreman with TMB client "
- "ID " << scheduler_client_id_;
-
- if (residue.size()) {
- // Allocate new blob, and copy residual bytes from last blob.
- allocateBlob();
- std::memcpy(writeableBlobAddress(), residue.data(), residue.size());
- written_ += residue.size();
+ // Ignore '\r'
+ if (c != '\r') {
+ field_string->push_back(c);
+ }
+ }
+ } else {
+ for (;; ++cur_ptr) {
+ const char c = *cur_ptr;
+ if (c == '\\') {
+ ++cur_ptr;
+ const char first_escaped_character = *cur_ptr;
+ switch (first_escaped_character) {
+ case '0': // Fallthrough for octal digits.
+ case '1':
+ case '2':
+ case '3':
+ case '4':
+ case '5':
+ case '6':
+ case '7':
+ field_string->push_back(ParseOctalLiteral(&cur_ptr));
+ break;
+ case 'N': {
+ // Null literal after some other column data.
+ throw TextScanFormatError(
+ "Null indicator '\\N' encountered in text scan mixed in with "
+ "other column data.");
+ }
+ case '\\':
+ // Backslash.
+ field_string->push_back('\\');
+ break;
+ case 'b':
+ // Backspace.
+ field_string->push_back('\b');
+ break;
+ case 'f':
+ // Form-feed.
+ field_string->push_back('\f');
+ break;
+ case 'n':
+ // Newline.
+ field_string->push_back('\n');
+ break;
+ case 'r':
+ // Carriage return.
+ field_string->push_back('\r');
+ break;
+ case 't':
+ // Tab.
+ field_string->push_back('\t');
+ break;
+ case 'v':
+ // Vertical tab.
+ field_string->push_back('\v');
+ break;
+ case 'x':
+ if (std::isxdigit(cur_ptr[1])) {
+ // Hexidecimal char literal.
+ ++cur_ptr;
+ field_string->push_back(ParseHexLiteral(&cur_ptr));
+ } else {
+ // Just an escaped 'x' with no hex digits.
+ field_string->push_back('x');
+ }
+ break;
+ case '\n':
+ throw TextScanFormatError(
+ "Backslash line splicing is not supported.");
+ default:
+ // Append escaped character as-is.
+ field_string->push_back(first_escaped_character);
+ break;
+ }
+ } else if (c == field_terminator_) {
+ *has_reached_end_of_line = false;
+ break;
+ } else if (c == '\n') {
+ *has_reached_end_of_line = true;
+ break;
+ } else {
+ if (c != '\r') {
+ // Ignore '\r'
+ field_string->push_back(c);
+ }
+ }
+ }
}
+ *field_ptr = cur_ptr + 1;
}
} // namespace quickstep
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 3cda65b..d73e7dd 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -1,6 +1,8 @@
/**
* Copyright 2011-2015 Quickstep Technologies LLC.
* Copyright 2015-2016 Pivotal Software, Inc.
+ * Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ * University of Wisconsin\u2014Madison.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,26 +20,18 @@
#ifndef QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
-#include <atomic>
+#include <cctype>
#include <cstddef>
-#include <cstdint>
-#include <cstdio>
#include <exception>
#include <string>
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
#include "types/containers/Tuple.hpp"
#include "utility/Macros.hpp"
-#include "utility/ThreadSafeQueue.hpp"
-
-#include "glog/logging.h"
#include "tmb/id_typedefs.h"
@@ -98,26 +92,11 @@ class TextScanFormatError : public std::exception {
};
/**
- * @brief A structure for text data blobs.
- */
-struct TextBlob {
- TextBlob(const block_id text_blob_id, const std::size_t text_size)
- : blob_id(text_blob_id), size(text_size) {}
- block_id blob_id;
- std::size_t size;
-};
-
-/**
* @brief An operator which reads tuples from a text file and inserts them into
* a relation.
**/
class TextScanOperator : public RelationalOperator {
public:
- enum FeedbackMessageType : WorkOrder::FeedbackMessageType {
- kNewTextBlobMessage,
- kSplitWorkOrderCompletionMessage,
- };
-
/**
* @brief Constructor
*
@@ -130,29 +109,22 @@ class TextScanOperator : public RelationalOperator {
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
- * @param parallelize_load Parallelize the load process by th spliting file
- * into blobs, and generating separate work-orders for each of them.
* @param output_relation The output relation.
* @param output_destination_index The index of the InsertDestination in the
* QueryContext to insert tuples.
**/
- TextScanOperator(
- const std::size_t query_id,
- const std::string &file_pattern,
- const char field_terminator,
- const bool process_escape_sequences,
- const bool parallelize_load,
- const CatalogRelation &output_relation,
- const QueryContext::insert_destination_id output_destination_index)
+ TextScanOperator(const std::size_t query_id,
+ const std::string &file_pattern,
+ const char field_terminator,
+ const bool process_escape_sequences,
+ const CatalogRelation &output_relation,
+ const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
file_pattern_(file_pattern),
field_terminator_(field_terminator),
process_escape_sequences_(process_escape_sequences),
- parallelize_load_(parallelize_load),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
- num_done_split_work_orders_(0),
- num_split_work_orders_(0),
work_generated_(false) {}
~TextScanOperator() override {}
@@ -171,23 +143,14 @@ class TextScanOperator : public RelationalOperator {
return output_relation_.getID();
}
- void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override;
-
private:
const std::string file_pattern_;
const char field_terminator_;
const bool process_escape_sequences_;
- const bool parallelize_load_;
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
- ThreadSafeQueue<TextBlob> text_blob_queue_;
- std::atomic<std::uint32_t> num_done_split_work_orders_;
- std::uint32_t num_split_work_orders_;
-
- // Indicates if work order to load file is generated for non-parallel load, and
- // if work order to split file to blobs is generated for parallel load.
bool work_generated_;
DISALLOW_COPY_AND_ASSIGN(TextScanOperator);
@@ -203,7 +166,9 @@ class TextScanWorkOrder : public WorkOrder {
*
* @param query_id The ID of the query to which this WorkOrder belongs.
* @param filename The name of the text file to bulk insert.
- * @param field_terminator The string which separates attribute values in
+ * @param text_offset The start position in the text file to start text scan.
+ * @param text_segment_size The size of text segment to be scanned.
+ * @param field_terminator The character which separates attribute values in
* the text file.
* @param process_escape_sequences Whether to decode escape sequences in the
* text file.
@@ -213,28 +178,8 @@ class TextScanWorkOrder : public WorkOrder {
TextScanWorkOrder(
const std::size_t query_id,
const std::string &filename,
- const char field_terminator,
- const bool process_escape_sequences,
- InsertDestination *output_destination,
- StorageManager *storage_manager);
-
- /**
- * @brief Constructor.
- *
- * @param query_id The ID of the query to which this WorkOrder belongs.
- * @param text_blob Blob ID containing the data to be scanned.
- * @param text_size Size of the data in the blob.
- * @param field_terminator The character which separates attribute values in
- * the text file.
- * @param process_escape_sequences Whether to decode escape sequences in the
- * text file.
- * @param output_destination The InsertDestination to write the read tuples.
- * @param storage_manager The StorageManager to use.
- */
- TextScanWorkOrder(
- const std::size_t query_id,
- const block_id text_blob,
- const std::size_t text_size,
+ const std::size_t text_offset,
+ const std::size_t text_segment_size,
const char field_terminator,
const bool process_escape_sequences,
InsertDestination *output_destination,
@@ -255,141 +200,106 @@ class TextScanWorkOrder : public WorkOrder {
void execute() override;
private:
- // Parse up to three octal digits (0-7) starting at '*start_pos' in
- // 'row_string' as a char literal. '*start_pos' will be modified to
- // the first position AFTER the parsed octal digits.
- static char ParseOctalLiteral(const std::string &row_string,
- std::size_t *start_pos);
-
- // Parse up to two hexadecimal digits (0-F, case insensitive) starting at
- // '*start_pos' in 'row_string' as a char literal. '*start_pos' will be
- // modified to the first position AFTER the parsed hexadecimal digits.
- static char ParseHexLiteral(const std::string &row_string,
- std::size_t *start_pos);
-
- // Read the next text row from the open FILE stream '*file' into
- // '*row_string'. Returns false if end-of-file is reached and there are no
- // more rows, true if a row string was successfully read. For ease of
- // parsing, '*row_string' has the trailing row-terminator removed and
- // replaced with a field-terminator.
- bool readRowFromFile(FILE *file, std::string *row_string) const;
-
- // Read the next text from blob memory starting at '**start_pos' and ending
- // at '*end_pos' into '*row_string'. Returns false if the end of the blob is
- // reached and there are no more rows, true if a row was successfully read.
- // For ease of parsing, '*row_string' has the trailing row-terminator removed
- // and replaced with a field-terminator. After call '*start_pos' points to
- // first character AFTER the read row in the blob.
- bool readRowFromBlob(const char **start_pos,
- const char *end_pos,
- std::string *row_string) const;
-
- // Trim a row-terminator (newline or carriage-return + newline) off the end
- // of '*row_string'. Returns true if the row-terminator was successfully
- // removed, false if '*row_string' did not end in a row-terminator.
- bool removeRowTerminator(std::string *row_string) const;
-
- // Extract a field string starting at '*start_pos' in 'row_string' into
- // '*field_string'. This method also expands escape sequences if
- // 'process_escape_sequences_' is true. Returns true if a field string was
- // successfully extracted, false in the special case where the NULL-literal
- // string "\N" was found. Throws TextScanFormatError if text was malformed.
- bool extractFieldString(const std::string &row_string,
- std::size_t *start_pos,
- std::string *field_string) const;
-
- // Make a tuple by parsing all of the individual fields specified in
- // 'row_string'.
- Tuple parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const;
-
- const bool is_file_;
- const std::string filename_;
- const char field_terminator_;
- const block_id text_blob_;
- const std::size_t text_size_;
- const bool process_escape_sequences_;
-
- InsertDestination *output_destination_;
- StorageManager *storage_manager_;
-
- DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
-};
-
-/**
- * @brief A WorkOrder to split the file into blobs of text that can be processed
- * separately.
- **/
-class TextSplitWorkOrder : public WorkOrder {
- public:
/**
- * @brief Constructor.
+ * @brief Extract a field string starting at \p *field_ptr. This method also
+ * expands escape sequences if \p process_escape_sequences_ is true.
+ * Throws TextScanFormatError if text was malformed.
*
- * @param query_id The ID of the query to which this WorkOrder belongs.
- * @param filename File to split into row-aligned blobs.
- * @param process_escape_sequences Whether to decode escape sequences in the
- * text file.
- * @param storage_manager The StorageManager to use.
- * @param operator_index Operator index of the current operator. This is used
- * to send new-work available message to Foreman.
- * @param scheduler_client_id The TMB client ID of the scheduler thread.
- * @param bus A pointer to the TMB.
+ * @param field_ptr \p *field_ptr points to the current position of the input
+ * char stream for parsing. The overall char stream must end with a
+ * newline character. After the call, \p *field_ptr will be modified to
+ * the start position of the NEXT field string.
+ * @param is_null_literal OUTPUT parameter. Set to true if the NULL-literal
+ * string "\N" was found.
+ * @param has_reached_end_of_line OUTPUT parameter. Set to true if the newline
+ * character was encountered.
+ * @param field_string OUTPUT parameter. Set to the extracted field string.
*/
- TextSplitWorkOrder(const std::size_t query_id,
- const std::string &filename,
- const bool process_escape_sequences,
- StorageManager *storage_manager,
- const std::size_t operator_index,
- const tmb::client_id scheduler_client_id,
- MessageBus *bus)
- : WorkOrder(query_id),
- filename_(filename),
- process_escape_sequences_(process_escape_sequences),
- storage_manager_(DCHECK_NOTNULL(storage_manager)),
- operator_index_(operator_index),
- scheduler_client_id_(scheduler_client_id),
- bus_(DCHECK_NOTNULL(bus)) {}
+ void extractFieldString(const char **field_ptr,
+ bool *is_null_literal,
+ bool *has_reached_end_of_line,
+ std::string *field_string) const;
/**
- * @exception TextScanReadError The text file could not be opened for
- * reading.
+ * @brief Make a tuple by parsing all of the individual fields from a char stream.
+ *
+ * @param \p *row_ptr points to the current position of the input char stream
+ * for parsing. The overall char stream must end with a newline character.
+ * After the call, \p *row_ptr will be modified to the start position of
+ * the NEXT text row.
+ * @param relation The relation schema for the tuple.
+ * @return The tuple parsed from the char stream.
*/
- void execute() override;
-
- private:
- // Allocate a new blob.
- void allocateBlob();
-
- // Find the last row terminator in current blob.
- std::size_t findLastRowTerminator();
+ Tuple parseRow(const char **row_ptr,
+ const CatalogRelationSchema &relation) const;
- // Send the blob info to its operator via TMB.
- void sendBlobInfoToOperator(const bool write_row_aligned);
- // Get the writeable address (unwritten chunk) in current blob.
- inline char* writeableBlobAddress() {
- return static_cast<char*>(text_blob_->getMemoryMutable()) + written_;
+ /**
+ * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
+ * a char literal. \p *literal_ptr will be modified to the last position
+ * of the parsed octal digits.
+ *
+ * @param literal_ptr \p *literal_ptr points to the current position of the
+ * input char stream for parsing. The overall char stream must end with
+ * a newline character.
+ * @return The char literal from the parsed octal digits.
+ */
+ inline static char ParseOctalLiteral(const char **literal_ptr) {
+ int value = 0;
+ const char *ptr = *literal_ptr;
+ for (int i = 0; i < 3; ++i, ++ptr) {
+ const int char_value = *ptr - '0';
+ if ((char_value >= 0) && (char_value < 8)) {
+ value = value * 8 + char_value;
+ } else {
+ break;
+ }
+ }
+ *literal_ptr = ptr - 1;
+ return value;
}
- // Number of bytes remaining to be written.
- inline std::size_t remainingBlobBytes() const {
- return blob_size_ - written_;
+ /**
+ * @brief Parse up to two hexadecimal digits (0-F, case insensitive) starting
+ * at \p *literal_ptr as a char literal. \p *literal_ptr will be modified
+ * to the last position of the parsed octal digits.
+ *
+ * @param literal_ptr \p *literal_ptr points to the current position of the
+ * input char stream for parsing. The overall char stream must end with
+ * a newline character.
+ * @return The char literal from the parsed hexadecimal digits.
+ */
+ inline static char ParseHexLiteral(const char **literal_ptr) {
+ int value = 0;
+ const char *ptr = *literal_ptr;
+ for (int i = 0; i < 2; ++i, ++ptr) {
+ const char c = *ptr;
+ int char_value;
+ if (std::isdigit(c)) {
+ char_value = c - '0';
+ } else if (c >= 'a' && c <= 'f') {
+ char_value = c - 'a' + 10;
+ } else if (c >= 'A' && c <= 'F') {
+ char_value = c - 'A' + 10;
+ } else {
+ break;
+ }
+ value = value * 16 + char_value;
+ }
+ *literal_ptr = ptr - 1;
+ return value;
}
- const std::string filename_; // File to split.
+ const std::string filename_;
+ const std::size_t text_offset_;
+ const std::size_t text_segment_size_;
+ const char field_terminator_;
const bool process_escape_sequences_;
+ InsertDestination *output_destination_;
StorageManager *storage_manager_;
- const std::size_t operator_index_; // Opeartor index.
- const tmb::client_id scheduler_client_id_; // The scheduler's TMB client ID.
- MessageBus *bus_;
-
- MutableBlobReference text_blob_; // Mutable reference to current blob.
- block_id text_blob_id_; // Current blob ID.
- std::size_t written_ = 0; // Bytes written in current blob.
- std::size_t blob_size_ = 0; // Size of the current blob.
-
- DISALLOW_COPY_AND_ASSIGN(TextSplitWorkOrder);
+ DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
};
/** @} */
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index fd731f7..60d4c8f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -1,5 +1,7 @@
// Copyright 2011-2015 Quickstep Technologies LLC.
// Copyright 2015-2016 Pivotal Software, Inc.
+// Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+// University of Wisconsin\u2014Madison.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -18,7 +20,6 @@ syntax = "proto2";
package quickstep.serialization;
import "relational_operators/SortMergeRunOperator.proto";
-import "relational_operators/TextScanOperator.proto";
enum WorkOrderType {
AGGREGATION = 1;
@@ -39,8 +40,7 @@ enum WorkOrderType {
SORT_RUN_GENERATION = 16;
TABLE_GENERATOR = 17;
TEXT_SCAN = 18;
- TEXT_SPLIT = 19;
- UPDATE = 20;
+ UPDATE = 19;
}
message WorkOrder {
@@ -223,15 +223,12 @@ message TableGeneratorWorkOrder {
message TextScanWorkOrder {
extend WorkOrder {
// All required.
+ optional string filename = 301;
+ optional uint64 text_offset = 302;
+ optional uint64 text_segment_size = 303;
optional uint32 field_terminator = 304; // For one-byte char.
optional bool process_escape_sequences = 305;
optional int32 insert_destination_index = 306;
-
- // Either
- optional string filename = 307;
-
- // Or
- optional TextBlob text_blob = 308;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 489b666..da42b4d 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -42,7 +42,6 @@
#include "relational_operators/SortRunGenerationOperator.hpp"
#include "relational_operators/TableGeneratorOperator.hpp"
#include "relational_operators/TextScanOperator.hpp"
-#include "relational_operators/TextScanOperator.pb.h"
#include "relational_operators/UpdateOperator.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/StorageBlockInfo.hpp"
@@ -389,40 +388,16 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
}
case serialization::TEXT_SCAN: {
LOG(INFO) << "Creating TextScanWorkOrder";
- if (proto.HasExtension(serialization::TextScanWorkOrder::filename)) {
- return new TextScanWorkOrder(
- proto.query_id(),
- proto.GetExtension(serialization::TextScanWorkOrder::filename),
- proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
- proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
- query_context->getInsertDestination(
- proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
- storage_manager);
- } else {
- const serialization::TextBlob &text_blob_proto =
- proto.GetExtension(serialization::TextScanWorkOrder::text_blob);
-
- return new TextScanWorkOrder(
- proto.query_id(),
- text_blob_proto.blob_id(),
- text_blob_proto.size(),
- proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
- proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
- query_context->getInsertDestination(
- proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
- storage_manager);
- }
- }
- case serialization::TEXT_SPLIT: {
- LOG(INFO) << "Creating TextSplitWorkOrder";
- return new TextSplitWorkOrder(
+ return new TextScanWorkOrder(
proto.query_id(),
- proto.GetExtension(serialization::TextSplitWorkOrder::filename),
- proto.GetExtension(serialization::TextSplitWorkOrder::process_escape_sequences),
- storage_manager,
- proto.GetExtension(serialization::TextSplitWorkOrder::operator_index),
- shiftboss_client_id,
- bus);
+ proto.GetExtension(serialization::TextScanWorkOrder::filename),
+ proto.GetExtension(serialization::TextScanWorkOrder::text_offset),
+ proto.GetExtension(serialization::TextScanWorkOrder::text_segment_size),
+ proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
+ proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+ query_context->getInsertDestination(
+ proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+ storage_manager);
}
case serialization::UPDATE: {
LOG(INFO) << "Creating UpdateWorkOrder";
@@ -691,27 +666,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index));
}
case serialization::TEXT_SCAN: {
- if (!proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) ||
- !proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) ||
- !proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) ||
- !query_context.isValidInsertDestinationId(
- proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))) {
- return false;
- }
-
- // Two fields are exclusive.
- if (proto.HasExtension(serialization::TextScanWorkOrder::filename) ==
- proto.HasExtension(serialization::TextScanWorkOrder::text_blob)) {
- return false;
- }
-
- return proto.HasExtension(serialization::TextScanWorkOrder::filename) ||
- proto.GetExtension(serialization::TextScanWorkOrder::text_blob).IsInitialized();
- }
- case serialization::TEXT_SPLIT: {
- return proto.HasExtension(serialization::TextSplitWorkOrder::filename) &&
- proto.HasExtension(serialization::TextSplitWorkOrder::process_escape_sequences) &&
- proto.HasExtension(serialization::TextSplitWorkOrder::operator_index);
+ return proto.HasExtension(serialization::TextScanWorkOrder::filename) &&
+ proto.HasExtension(serialization::TextScanWorkOrder::text_offset) &&
+ proto.HasExtension(serialization::TextScanWorkOrder::text_segment_size) &&
+ proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) &&
+ proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) &&
+ proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) &&
+ query_context.isValidInsertDestinationId(
+ proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index));
}
case serialization::UPDATE: {
return proto.HasExtension(serialization::UpdateWorkOrder::relation_id) &&
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index ef6fc2d..5860745 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -193,7 +193,6 @@ TEST_F(TextScanOperatorTest, ScanTest) {
input_filename,
'\t',
true,
- false,
*relation_,
output_destination_index));
http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/tests/text_scan_input.txt
----------------------------------------------------------------------
diff --git a/relational_operators/tests/text_scan_input.txt b/relational_operators/tests/text_scan_input.txt
index bcb76bf..51015bd 100644
--- a/relational_operators/tests/text_scan_input.txt
+++ b/relational_operators/tests/text_scan_input.txt
@@ -2,9 +2,5 @@
-1234567890 -1.2e-200 A twenty char string 1969-07-21 02:56:00 00:00:01.001 Another twenty chars
\N \N \N \N \N \N
\N \N \\N \N \N \\N
-\x34\062 \55\064\x32\56\65 \x7B\
-\t\ \\\e\s\c\a\p\e\d\x\b\n\x7d 1988-07-16\T00:00\:00\x2E0\x30\60\06001 00:00:00 'good\' \"bye"\r\n\
-\r\n\v\n\
-
-0 0.0 \\\\\
-\\\\\n 1970-01-01 0 s \\\\
+\x34\062 \55\064\x32\56\65 \x7B\n\t\ \\\e\s\c\a\p\e\d\x\b\n\x7d 1988-07-16\T00:00\:00\x2E0\x30\60\06001 00:00:00 'good\' \"bye"\r\n\n\r\n\v\n\n
+0 0.0 \\\\\n\\\\\n 1970-01-01 0 s \\\\
[3/3] incubator-quickstep git commit: Merge branch
'travis_sharedlibs' of
https://git-wip-us.apache.org/repos/asf/incubator-quickstep
Posted by na...@apache.org.
Merge branch 'travis_sharedlibs' of https://git-wip-us.apache.org/repos/asf/incubator-quickstep
Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/18c9dc18
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/18c9dc18
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/18c9dc18
Branch: refs/heads/travis_sharedlibs
Commit: 18c9dc18c211f8327b42bd898534b92d78184187
Parents: 4f8fdbe ef6a452
Author: Navneet Potti <na...@apache.org>
Authored: Thu Jun 9 15:02:42 2016 -0500
Committer: Navneet Potti <na...@apache.org>
Committed: Thu Jun 9 15:02:42 2016 -0500
----------------------------------------------------------------------
.travis.yml | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------