You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@quickstep.apache.org by ji...@apache.org on 2016/06/16 04:20:02 UTC

[03/20] incubator-quickstep git commit: Improved TextScanOperator.

Improved TextScanOperator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/commit/4f8fdbe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/tree/4f8fdbe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quickstep/diff/4f8fdbe8

Branch: refs/heads/adaptive-bloom-filters
Commit: 4f8fdbe8451aed1ad1c07a8badb5be85bee1ff57
Parents: eebb464
Author: Jianqiao Zhu <ji...@cs.wisc.edu>
Authored: Thu Jun 9 03:18:37 2016 -0500
Committer: Zuyu Zhang <zu...@apache.org>
Committed: Thu Jun 9 10:52:40 2016 -0700

----------------------------------------------------------------------
 query_optimizer/ExecutionGenerator.cpp          |   1 -
 relational_operators/CMakeLists.txt             |  23 +-
 relational_operators/TextScanOperator.cpp       | 818 ++++++-------------
 relational_operators/TextScanOperator.hpp       | 286 +++----
 relational_operators/WorkOrder.proto            |  15 +-
 relational_operators/WorkOrderFactory.cpp       |  72 +-
 .../tests/TextScanOperator_unittest.cpp         |   1 -
 relational_operators/tests/text_scan_input.txt  |   8 +-
 8 files changed, 384 insertions(+), 840 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/query_optimizer/ExecutionGenerator.cpp
----------------------------------------------------------------------
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 99c2a21..f9fd742 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -945,7 +945,6 @@ void ExecutionGenerator::convertCopyFrom(
               physical_plan->file_name(),
               physical_plan->column_delimiter(),
               physical_plan->escape_strings(),
-              FLAGS_parallelize_load,
               *output_relation,
               insert_destination_index));
   insert_destination_proto->set_relational_op_index(scan_operator_index);

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index d2693eb..eb73c07 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015-2016 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin\u2014Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -16,9 +18,6 @@
 QS_PROTOBUF_GENERATE_CPP(relationaloperators_SortMergeRunOperator_proto_srcs
                          relationaloperators_SortMergeRunOperator_proto_hdrs
                          SortMergeRunOperator.proto)
-QS_PROTOBUF_GENERATE_CPP(relationaloperators_TextScanOperator_proto_srcs
-                         relationaloperators_TextScanOperator_proto_hdrs
-                         TextScanOperator.proto)
 QS_PROTOBUF_GENERATE_CPP(relationaloperators_WorkOrder_proto_srcs
                          relationaloperators_WorkOrder_proto_hdrs
                          WorkOrder.proto)
@@ -61,9 +60,6 @@ add_library(quickstep_relationaloperators_SortRunGenerationOperator SortRunGener
             SortRunGenerationOperator.hpp)
 add_library(quickstep_relationaloperators_TableGeneratorOperator TableGeneratorOperator.cpp TableGeneratorOperator.hpp)
 add_library(quickstep_relationaloperators_TextScanOperator TextScanOperator.cpp TextScanOperator.hpp)
-add_library(quickstep_relationaloperators_TextScanOperator_proto
-            ${relationaloperators_TextScanOperator_proto_srcs}
-            ${relationaloperators_TextScanOperator_proto_hdrs})
 add_library(quickstep_relationaloperators_UpdateOperator UpdateOperator.cpp UpdateOperator.hpp)
 add_library(quickstep_relationaloperators_WorkOrder ../empty_src.cpp WorkOrder.hpp)
 add_library(quickstep_relationaloperators_WorkOrderFactory WorkOrderFactory.cpp WorkOrderFactory.hpp)
@@ -360,27 +356,19 @@ target_link_libraries(quickstep_relationaloperators_TextScanOperator
                       glog
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelation
-                      quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_QueryContext
-                      quickstep_queryexecution_QueryExecutionMessages_proto
-                      quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_queryexecution_QueryExecutionUtil
                       quickstep_queryexecution_WorkOrdersContainer
                       quickstep_relationaloperators_RelationalOperator
-                      quickstep_relationaloperators_TextScanOperator_proto
                       quickstep_relationaloperators_WorkOrder
                       quickstep_storage_InsertDestination
-                      quickstep_storage_StorageBlob
-                      quickstep_storage_StorageBlockInfo
-                      quickstep_storage_StorageManager
-                      quickstep_threading_ThreadIDBasedMap
                       quickstep_types_Type
                       quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
                       quickstep_utility_Glob
                       quickstep_utility_Macros
-                      quickstep_utility_ThreadSafeQueue
                       tmb)
 target_link_libraries(quickstep_relationaloperators_UpdateOperator
                       glog
@@ -430,7 +418,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       quickstep_relationaloperators_SortRunGenerationOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
-                      quickstep_relationaloperators_TextScanOperator_proto
                       quickstep_relationaloperators_UpdateOperator
                       quickstep_relationaloperators_WorkOrder_proto
                       quickstep_storage_StorageBlockInfo
@@ -438,7 +425,6 @@ target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
                       tmb)
 target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
                       quickstep_relationaloperators_SortMergeRunOperator_proto
-                      quickstep_relationaloperators_TextScanOperator_proto
                       ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:
@@ -466,7 +452,6 @@ target_link_libraries(quickstep_relationaloperators
                       quickstep_relationaloperators_SortRunGenerationOperator
                       quickstep_relationaloperators_TableGeneratorOperator
                       quickstep_relationaloperators_TextScanOperator
-                      quickstep_relationaloperators_TextScanOperator_proto
                       quickstep_relationaloperators_UpdateOperator
                       quickstep_relationaloperators_WorkOrder
                       quickstep_relationaloperators_WorkOrderFactory

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/TextScanOperator.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index 5acecbf..d2fd0cd 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,124 +22,30 @@
 #include <algorithm>
 #include <cctype>
 #include <cstddef>
-#include <cstdint>
 #include <cstdio>
 #include <cstdlib>
-#include <cstring>
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
 
 #include "catalog/CatalogAttribute.hpp"
-#include "catalog/CatalogRelationSchema.hpp"
 #include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionMessages.pb.h"
-#include "query_execution/QueryExecutionUtil.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
-#include "relational_operators/TextScanOperator.pb.h"
 #include "storage/InsertDestination.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
-#include "storage/StorageManager.hpp"
-#include "threading/ThreadIDBasedMap.hpp"
 #include "types/Type.hpp"
 #include "types/TypedValue.hpp"
 #include "types/containers/Tuple.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/Glob.hpp"
 
-#include "gflags/gflags.h"
 #include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
-#include "tmb/message_bus.h"
-#include "tmb/tagged_message.h"
-
-using std::isxdigit;
-using std::size_t;
-using std::sscanf;
-using std::string;
 
 namespace quickstep {
 
-DEFINE_uint64(textscan_split_blob_size, 2,
-              "Size of blobs in number of slots the input text files "
-              "are split into in the TextScanOperator.");
-
-// Check if blob size is positive.
-static bool ValidateTextScanSplitBlobSize(const char *flagname,
-                                          std::uint64_t blob_size) {
-  if (blob_size == 0) {
-    LOG(ERROR) << "--" << flagname << " must be greater than 0";
-    return false;
-  }
-
-  return true;
-}
-
-static const volatile bool text_scan_split_blob_size_dummy = gflags::RegisterFlagValidator(
-    &FLAGS_textscan_split_blob_size, &ValidateTextScanSplitBlobSize);
-
-namespace {
-
-// Detect whether '*search_string' contains a row-terminator (either line-feed
-// or carriage-return + line-feed) immediately before 'end_pos'. If
-// 'process_escape_sequences' is true, this function will also eliminate
-// false-positives from an escaped row-terminator. Returns the number of
-// characters in the row-terminator, or 0 if no terminator is detected.
-inline unsigned DetectRowTerminator(const char *search_string,
-                                    std::size_t end_pos,
-                                    const bool process_escape_sequences) {
-  if (end_pos == 0) {
-    // Empty string.
-    return 0;
-  }
-
-  if (search_string[end_pos - 1] != '\n') {
-    // String doesn't end in newline.
-    return 0;
-  }
-
-  if (end_pos == 1) {
-    // String is the single newline character.
-    return 1;
-  }
-
-  const bool have_carriage_return = (search_string[end_pos - 2] == '\r');
-  if (have_carriage_return && (end_pos == 2)) {
-    // String is CR-LF and nothing else.
-    return 2;
-  }
-
-  std::size_t backslashes = 0;
-  // Count consecutive backslashes preceding the terminator. If there is an odd
-  // number of backslashes, then the terminator is escaped and doesn't count as
-  // a real terminator. If there is an even number of backslashes, then each
-  // pair is an escaped backslash literal and the terminator still counts.
-  if (process_escape_sequences) {
-    end_pos = end_pos - 2 - have_carriage_return;
-    while (end_pos != 0) {
-      if (search_string[end_pos] == '\\') {
-        ++backslashes;
-        --end_pos;
-        if ((end_pos == 0) && (search_string[0] == '\\')) {
-          // Don't forget to count a backslash at the very beginning of a string.
-          ++backslashes;
-        }
-      } else {
-        break;
-      }
-    }
-  }
-
-  if (backslashes & 0x1) {
-    return 0;
-  } else {
-    return 1 + have_carriage_return;
-  }
-}
-
-}  // namespace
-
 bool TextScanOperator::getAllWorkOrders(
     WorkOrdersContainer *container,
     QueryContext *query_context,
@@ -155,116 +63,50 @@ bool TextScanOperator::getAllWorkOrders(
   InsertDestination *output_destination =
       query_context->getInsertDestination(output_destination_index_);
 
-  if (parallelize_load_) {
-    // Parallel implementation: Split work orders are generated for each file
-    // being bulk-loaded. (More than one file can be loaded, because we support
-    // glob() semantics in file name.) These work orders read the input file,
-    // and split them in the blobs that can be parsed independently.
-    if (blocking_dependencies_met_) {
-      if (!work_generated_) {
-        // First, generate text-split work orders.
-        for (const auto &file : files) {
-          container->addNormalWorkOrder(
-              new TextSplitWorkOrder(query_id_,
-                                     file,
-                                     process_escape_sequences_,
-                                     storage_manager,
-                                     op_index_,
-                                     scheduler_client_id,
-                                     bus),
-              op_index_);
-          ++num_split_work_orders_;
-        }
-        work_generated_ = true;
-        return false;
-      } else {
-        // Check if there are blobs to parse.
-        while (!text_blob_queue_.empty()) {
-          const TextBlob blob_work = text_blob_queue_.popOne();
-          container->addNormalWorkOrder(
-              new TextScanWorkOrder(query_id_,
-                                    blob_work.blob_id,
-                                    blob_work.size,
-                                    field_terminator_,
-                                    process_escape_sequences_,
-                                    output_destination,
-                                    storage_manager),
-              op_index_);
-        }
-        // Done if all split work orders are completed, and no blobs are left to
-        // process.
-        return num_done_split_work_orders_.load(std::memory_order_acquire) == num_split_work_orders_ &&
-               text_blob_queue_.empty();
-      }
-    }
-    return false;
-  } else {
-    // Serial implementation.
-    if (blocking_dependencies_met_ && !work_generated_) {
-      for (const auto &file : files) {
+  // Text segment size set to 256KB.
+  constexpr std::size_t kTextSegmentSize = 0x40000u;
+
+  if (blocking_dependencies_met_ && !work_generated_) {
+    for (const std::string &file : files) {
+      // Use standard C libary to retrieve the file size.
+      FILE *fp = std::fopen(file.c_str(), "rb");
+      std::fseek(fp, 0, SEEK_END);
+      const std::size_t file_size = std::ftell(fp);
+      std::fclose(fp);
+
+      std::size_t text_offset = 0;
+      while (text_offset < file_size) {
         container->addNormalWorkOrder(
             new TextScanWorkOrder(query_id_,
                                   file,
+                                  text_offset,
+                                  std::min(kTextSegmentSize, file_size - text_offset),
                                   field_terminator_,
                                   process_escape_sequences_,
                                   output_destination,
                                   storage_manager),
             op_index_);
+        text_offset += kTextSegmentSize;
       }
-      work_generated_ = true;
     }
-    return work_generated_;
-  }
-}
-
-void TextScanOperator::receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) {
-  switch (msg.type()) {
-    case kSplitWorkOrderCompletionMessage: {
-      num_done_split_work_orders_.fetch_add(1, std::memory_order_release);
-      break;
-    }
-    case kNewTextBlobMessage: {
-      serialization::TextBlob proto;
-      CHECK(proto.ParseFromArray(msg.payload(), msg.payload_size()));
-      text_blob_queue_.push(TextBlob(proto.blob_id(), proto.size()));
-      break;
-    }
-    default:
-      LOG(ERROR) << "Unknown feedback message type for TextScanOperator";
+    work_generated_ = true;
   }
+  return work_generated_;
 }
 
 TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
                                      const std::string &filename,
+                                     const std::size_t text_offset,
+                                     const std::size_t text_segment_size,
                                      const char field_terminator,
                                      const bool process_escape_sequences,
                                      InsertDestination *output_destination,
                                      StorageManager *storage_manager)
     : WorkOrder(query_id),
-      is_file_(true),
       filename_(filename),
+      text_offset_(text_offset),
+      text_segment_size_(text_segment_size),
       field_terminator_(field_terminator),
-      text_blob_(0),
-      text_size_(0),
-      process_escape_sequences_(process_escape_sequences),
-      output_destination_(output_destination),
-      storage_manager_(storage_manager) {
-  DCHECK(output_destination_ != nullptr);
-  DCHECK(storage_manager_ != nullptr);
-}
-
-TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
-                                     const block_id text_blob,
-                                     const std::size_t text_size,
-                                     const char field_terminator,
-                                     const bool process_escape_sequences,
-                                     InsertDestination *output_destination,
-                                     StorageManager *storage_manager)
-    : WorkOrder(query_id),
-      is_file_(false),
-      field_terminator_(field_terminator),
-      text_blob_(text_blob),
-      text_size_(text_size),
       process_escape_sequences_(process_escape_sequences),
       output_destination_(output_destination),
       storage_manager_(storage_manager) {
@@ -274,439 +116,293 @@ TextScanWorkOrder::TextScanWorkOrder(const std::size_t query_id,
 
 void TextScanWorkOrder::execute() {
   const CatalogRelationSchema &relation = output_destination_->getRelation();
+  std::vector<Tuple> tuples;
 
-  string current_row_string;
-  if (is_file_) {
-    FILE *file = std::fopen(filename_.c_str(), "r");
-    if (file == nullptr) {
-      throw TextScanReadError(filename_);
-    }
+  constexpr std::size_t kSmallBufferSize = 0x4000;
+  char *buffer = reinterpret_cast<char *>(malloc(std::max(text_segment_size_, kSmallBufferSize)));
 
-    bool have_row = false;
-    do {
-      current_row_string.clear();
-      have_row = readRowFromFile(file, &current_row_string);
-      if (have_row) {
-        Tuple tuple = parseRow(current_row_string, relation);
-        output_destination_->insertTupleInBatch(tuple);
-      }
-    } while (have_row);
-
-    std::fclose(file);
-  } else {
-    BlobReference blob = storage_manager_->getBlob(text_blob_);
-    const char *blob_pos = static_cast<const char*>(blob->getMemory());
-    const char *blob_end = blob_pos + text_size_;
-    bool have_row = false;
-    do {
-      current_row_string.clear();
-      have_row = readRowFromBlob(&blob_pos, blob_end, &current_row_string);
-      if (have_row) {
-        Tuple tuple = parseRow(current_row_string, relation);
-        output_destination_->insertTupleInBatch(tuple);
-      }
-    } while (have_row);
-
-    // Drop the consumed blob produced by TextSplitWorkOrder.
-    blob.release();
-    storage_manager_->deleteBlockOrBlobFile(text_blob_);
+  // Read text segment into buffer.
+  FILE *file = std::fopen(filename_.c_str(), "rb");
+  std::fseek(file, text_offset_, SEEK_SET);
+  std::size_t bytes_read = std::fread(buffer, 1, text_segment_size_, file);
+  if (bytes_read != text_segment_size_) {
+    throw TextScanReadError(filename_);
   }
-}
 
-char TextScanWorkOrder::ParseOctalLiteral(const std::string &row_string,
-                                          std::size_t *start_pos) {
-  const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 3);
-
-  int value = 0;
-  for (; *start_pos < stop_pos; ++*start_pos) {
-    int char_value = row_string[*start_pos] - '0';
-    if ((char_value >= 0) && (char_value < 8)) {
-      value = value * 8 + char_value;
-    } else {
-      return value;
+  // Locate the first newline character.
+  const char *buffer_end = buffer + text_segment_size_;
+  const char *row_ptr = buffer;
+  if (text_offset_ != 0) {
+    while (row_ptr < buffer_end && *row_ptr != '\n') {
+      ++row_ptr;
     }
+  } else {
+    --row_ptr;
   }
 
-  return value;
-}
-
-char TextScanWorkOrder::ParseHexLiteral(const std::string &row_string,
-                                        std::size_t *start_pos) {
-  const std::size_t stop_pos = std::min(row_string.length(), *start_pos + 2);
+  if (row_ptr >= buffer_end) {
+    // This block does not even contain a newline character.
+    return;
+  }
 
-  int value = 0;
-  for (; *start_pos < stop_pos; ++*start_pos) {
-    if (!std::isxdigit(row_string[*start_pos])) {
-      break;
-    }
+  // Locate the last newline character.
+  const char *end_ptr = buffer_end - 1;
+  while (end_ptr > row_ptr && *end_ptr != '\n') {
+    --end_ptr;
+  }
 
-    int char_value;
-    if (std::isdigit(row_string[*start_pos])) {
-      char_value = row_string[*start_pos] - '0';
-    } else if (std::islower(row_string[*start_pos])) {
-      char_value = row_string[*start_pos] - 'a' + 10;
+  // Advance both row_ptr and end_ptr by 1.
+  ++row_ptr;
+  ++end_ptr;
+  // Now row_ptr is pointing to the first character RIGHT AFTER the FIRST newline
+  // character in this text segment, and end_ptr is pointing to the first character
+  // RIGHT AFTER the LAST newline character in this text segment.
+
+  // Process the tuples which are between the first newline character and the
+  // last newline character.
+  while (row_ptr < end_ptr) {
+    if (*row_ptr == '\r' || *row_ptr == '\n') {
+      // Skip empty lines.
+      ++row_ptr;
     } else {
-      char_value = row_string[*start_pos] - 'A' + 10;
+      tuples.emplace_back(parseRow(&row_ptr, relation));
     }
-
-    value = value * 16 + char_value;
   }
 
-  return value;
-}
+  // Process the tuple that is right after the last newline character.
+  // NOTE(jianqiao): dynamic_read_size is trying to balance between the cases
+  // that the last tuple is very small / very large.
+  std::size_t dynamic_read_size = 1024;
+  std::string row_string;
+  std::fseek(file, text_offset_ + (end_ptr - buffer), SEEK_SET);
+  bool has_reached_end = false;
+  do {
+    bytes_read = std::fread(buffer, 1, dynamic_read_size, file);
+    std::size_t bytes_to_copy = bytes_read;
 
-bool TextScanWorkOrder::readRowFromFile(FILE *file, std::string *row_string) const {
-  // Read up to 1023 chars + null-terminator at a time.
-  static constexpr std::size_t kRowBufferSize = 1024;
-  char row_buffer[kRowBufferSize];
-  for (;;) {
-    char *read_string = std::fgets(row_buffer, sizeof(row_buffer), file);
-    if (read_string == nullptr) {
-      if (std::feof(file)) {
-        if (row_string->empty()) {
-          return false;
-        } else {
-          throw TextScanFormatError("File ended without delimiter");
-        }
-      } else {
-        throw TextScanReadError(filename_);
+    for (std::size_t i = 0; i < bytes_read; ++i) {
+      if (buffer[i] == '\n') {
+        bytes_to_copy = i + 1;
+        has_reached_end = true;
+        break;
       }
     }
-
-    // Append the contents of the buffer to '*row_string', and see if we've
-    // reached a genuine row-terminator yet.
-    row_string->append(row_buffer);
-    if (removeRowTerminator(row_string)) {
-      row_string->push_back(field_terminator_);
-      return true;
+    if (!has_reached_end && bytes_read != dynamic_read_size) {
+      has_reached_end = true;
     }
-  }
-}
 
-bool TextScanWorkOrder::readRowFromBlob(const char **start_pos,
-                                        const char *end_pos,
-                                        std::string *row_string) const {
-  while (*start_pos != end_pos) {
-    const char *next_newline = static_cast<const char*>(std::memchr(
-        *start_pos,
-        '\n',
-        end_pos - *start_pos));
-
-    if (next_newline == nullptr) {
-      throw TextScanFormatError("File ended without delimiter");
-    }
+    row_string.append(buffer, bytes_to_copy);
+    dynamic_read_size = std::min(dynamic_read_size * 2, kSmallBufferSize);
+  } while (!has_reached_end);
 
-    // Append the blob's contents through the next newline to '*row_string',
-    // and see if we've reached a genuine row-terminator yet.
-    row_string->append(*start_pos, next_newline - *start_pos + 1);
-    *start_pos = next_newline + 1;
-    if (removeRowTerminator(row_string)) {
-      row_string->push_back(field_terminator_);
-      return true;
+  if (!row_string.empty()) {
+    if (row_string.back() != '\n') {
+      row_string.push_back('\n');
     }
+    row_ptr = row_string.c_str();
+    tuples.emplace_back(parseRow(&row_ptr, relation));
   }
 
-  if (row_string->empty()) {
-    return false;
-  } else {
-    throw TextScanFormatError("File ended without delimiter");
-  }
-}
-
-bool TextScanWorkOrder::removeRowTerminator(std::string *row_string) const {
-  unsigned row_term_chars = DetectRowTerminator(row_string->c_str(),
-                                                row_string->length(),
-                                                process_escape_sequences_);
-  if (row_term_chars == 0) {
-    return false;
-  } else {
-    row_string->resize(row_string->length() - row_term_chars);
-    return true;
-  }
-}
-
-bool TextScanWorkOrder::extractFieldString(const std::string &row_string,
-                                           std::size_t *start_pos,
-                                           std::string *field_string) const {
-  // Check for NULL literal string.
-  if (process_escape_sequences_
-      && (row_string.length() - *start_pos >= 3)
-      && (row_string[*start_pos] == '\\')
-      && (row_string[*start_pos + 1] == 'N')
-      && (row_string[*start_pos + 2] == field_terminator_)) {
-    *start_pos += 3;
-    return false;
-  }
-
-  // Scan up until terminator, expanding backslashed escape sequences as we go.
-  std::size_t terminator_pos = row_string.find(field_terminator_, *start_pos);
-  std::size_t scan_pos = *start_pos;
-
-  if (process_escape_sequences_) {
-    for (;;) {
-      std::size_t backslash_pos = row_string.find('\\', scan_pos);
-      if ((backslash_pos == std::string::npos) || (backslash_pos >= terminator_pos)) {
-        // No more backslashes, or the next backslash is beyond the field
-        // terminator.
-        break;
-      }
-
-      // Copy up to the backslash.
-      field_string->append(row_string, scan_pos, backslash_pos - scan_pos);
-
-      if (backslash_pos + 1 == terminator_pos) {
-        // The terminator we found was escaped by a backslash, so append the
-        // literal terminator and re-scan for the next terminator character.
-        field_string->push_back(field_terminator_);
-        scan_pos = terminator_pos + 1;
-        terminator_pos = row_string.find(field_terminator_, scan_pos);
-        continue;
+  std::fclose(file);
+  free(buffer);
+
+  // Store the tuples in a ColumnVectorsValueAccessor for bulk insert.
+  ColumnVectorsValueAccessor column_vectors;
+  std::size_t attr_id = 0;
+  for (const auto &attribute : relation) {
+    const Type &attr_type = attribute.getType();
+    if (attr_type.isVariableLength()) {
+      std::unique_ptr<IndirectColumnVector> column(
+          new IndirectColumnVector(attr_type, tuples.size()));
+      for (const auto &tuple : tuples) {
+        column->appendTypedValue(tuple.getAttributeValue(attr_id));
       }
-
-      // Expand escape sequence.
-      switch (row_string[backslash_pos + 1]) {
-        case '0':  // Fallthrough for octal digits.
-        case '1':
-        case '2':
-        case '3':
-        case '4':
-        case '5':
-        case '6':
-        case '7':
-          // Octal char literal.
-          scan_pos = backslash_pos + 1;
-          field_string->push_back(ParseOctalLiteral(row_string, &scan_pos));
-          break;
-        case 'N': {
-          // Null literal after some other column data.
-          throw TextScanFormatError(
-              "Null indicator '\\N' encountered in text scan mixed in with "
-              "other column data.");
-        }
-        case '\\':
-          // Backslash.
-          field_string->push_back('\\');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 'b':
-          // Backspace.
-          field_string->push_back('\b');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 'f':
-          // Form-feed.
-          field_string->push_back('\f');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 'n':
-          // Newline.
-          field_string->push_back('\n');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 'r':
-          // Carriage return.
-          field_string->push_back('\r');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 't':
-          // Tab.
-          field_string->push_back('\t');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 'v':
-          // Vertical tab.
-          field_string->push_back('\v');
-          scan_pos = backslash_pos + 2;
-          break;
-        case 'x':
-          if ((backslash_pos + 2 < row_string.length()) && std::isxdigit(row_string[backslash_pos + 2])) {
-            // Hexidecimal char literal.
-            scan_pos = backslash_pos + 2;
-            field_string->push_back(ParseHexLiteral(row_string, &scan_pos));
-          } else {
-            // Just an escaped 'x' with no hex digits.
-            field_string->push_back('x');
-            scan_pos = backslash_pos + 2;
-          }
-          break;
-        default:
-          // Append escaped character as-is.
-          field_string->push_back(row_string[backslash_pos + 1]);
-          scan_pos = backslash_pos + 2;
-          break;
+      column_vectors.addColumn(column.release());
+    } else {
+      std::unique_ptr<NativeColumnVector> column(
+          new NativeColumnVector(attr_type, tuples.size()));
+      for (const auto &tuple : tuples) {
+        column->appendTypedValue(tuple.getAttributeValue(attr_id));
       }
+      column_vectors.addColumn(column.release());
     }
+    ++attr_id;
   }
 
-  DCHECK_NE(terminator_pos, std::string::npos);
-  field_string->append(row_string, scan_pos, terminator_pos - scan_pos);
-  *start_pos = terminator_pos + 1;
-  return true;
+  // Bulk insert the tuples.
+  output_destination_->bulkInsertTuples(&column_vectors);
 }
 
-Tuple TextScanWorkOrder::parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const {
+Tuple TextScanWorkOrder::parseRow(const char **row_ptr,
+                                  const CatalogRelationSchema &relation) const {
   std::vector<TypedValue> attribute_values;
 
-  std::size_t pos = 0;
+  bool is_null_literal;
+  bool has_reached_end_of_line = false;
   std::string value_str;
-  CatalogRelationSchema::const_iterator attr_it = relation.begin();
-  while (pos < row_string.length()) {
-    if (attr_it == relation.end()) {
-      throw TextScanFormatError("Row has too many fields");
+  for (const auto &attr : relation) {
+    if (has_reached_end_of_line) {
+      throw TextScanFormatError("Row has too few fields");
     }
 
     value_str.clear();
-    if (extractFieldString(row_string, &pos, &value_str)) {
-      attribute_values.emplace_back();
-      if (!attr_it->getType().parseValueFromString(value_str, &(attribute_values.back()))) {
-        throw TextScanFormatError("Failed to parse value");
-      }
-    } else {
+    extractFieldString(row_ptr,
+                       &is_null_literal,
+                       &has_reached_end_of_line,
+                       &value_str);
+
+    if (is_null_literal) {
       // NULL literal.
-      if (!attr_it->getType().isNullable()) {
+      if (!attr.getType().isNullable()) {
         throw TextScanFormatError(
             "NULL literal '\\N' was specified for a column with a "
             "non-nullable Type");
       }
-
-      attribute_values.emplace_back(attr_it->getType().makeNullValue());
+      attribute_values.emplace_back(attr.getType().makeNullValue());
+    } else {
+      attribute_values.emplace_back();
+      if (!attr.getType().parseValueFromString(value_str, &(attribute_values.back()))) {
+        throw TextScanFormatError("Failed to parse value");
+      }
     }
-
-    ++attr_it;
   }
 
-  if (attr_it != relation.end()) {
-    throw TextScanFormatError("Row has too few fields");
+  if (!has_reached_end_of_line) {
+    throw TextScanFormatError("Row has too many fields");
   }
 
   return Tuple(std::move(attribute_values));
 }
 
-void TextSplitWorkOrder::execute() {
-  std::FILE *file = std::fopen(filename_.c_str(), "r");
-  if (!file) {
-    throw TextScanReadError(filename_);
-  }
-
-  bool eof = false;
-  do {
-    // Allocate new blob, if current is empty.
-    if (0 == remainingBlobBytes()) {
-      allocateBlob();
-    }
-
-    // Read the into the unwritten part of blob.
-    std::size_t bytes =
-        std::fread(writeableBlobAddress(), 1, remainingBlobBytes(), file);
-    eof = bytes < remainingBlobBytes();
-    written_ += bytes;
-
-    // Write the current blob to queue for processing.
-    sendBlobInfoToOperator(!eof /* write_row_aligned */);
-  } while (!eof);
-
-  std::fclose(file);
+void TextScanWorkOrder::extractFieldString(const char **field_ptr,
+                                           bool *is_null_literal,
+                                           bool *has_reached_end_of_line,
+                                           std::string *field_string) const {
+  const char *cur_ptr = *field_ptr;
+  *is_null_literal = false;
 
-  // Notify the operator about the completion of this Work Order.
-  FeedbackMessage msg(TextScanOperator::kSplitWorkOrderCompletionMessage,
-                      operator_index_,
-                      nullptr /* payload */,
-                      0 /* payload_size */,
-                      false /* ownership */);
-  SendFeedbackMessage(bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg);
-}
+  // Check for NULL literal string.
+  if (process_escape_sequences_ && cur_ptr[0] == '\\' && cur_ptr[1] == 'N') {
+    cur_ptr += 2;
 
-// Allocate new blob.
-void TextSplitWorkOrder::allocateBlob() {
-  text_blob_id_ = storage_manager_->createBlob(FLAGS_textscan_split_blob_size);
-  text_blob_ = storage_manager_->getBlobMutable(text_blob_id_);
-  blob_size_ = text_blob_->size();
-  written_ = 0;
-}
+    // Skip '\r'
+    if (*cur_ptr == '\r') {
+      ++cur_ptr;
+    }
 
-// Find the last row terminator in the blob.
-std::size_t TextSplitWorkOrder::findLastRowTerminator() {
-  std::size_t found = 0;
-  const char *blob = static_cast<const char *>(text_blob_->getMemory());
-
-  for (std::size_t index = written_;
-       index != 0;
-       --index) {
-    if (DetectRowTerminator(blob, index, process_escape_sequences_)) {
-      found = index;
-      break;
+    const char c = *cur_ptr;
+    if (c == field_terminator_ || c == '\n') {
+      *is_null_literal = true;
+      *has_reached_end_of_line = (c == '\n');
+      *field_ptr = cur_ptr + 1;
+      return;
     }
   }
 
-  // TODO(quickstep-team): Design a way to handle long rows that are larger than
-  // the configured blob size.
-  CHECK_NE(0u, found) << "No row terminator found in " << FLAGS_textscan_split_blob_size
-                      << "-slot chunk of " << filename_;
-  return found;
-}
+  // Not a NULL literal string, rewind cur_ptr to the start position for parsing.
+  cur_ptr = *field_ptr;
 
-void TextSplitWorkOrder::sendBlobInfoToOperator(const bool write_row_aligned) {
-  std::size_t text_len = written_;
-  std::string residue;
-  if (write_row_aligned) {
-    // Find last row terminator in current blob.
-    text_len = findLastRowTerminator();
-
-    // Copy the residual bytes after the last row terminator.
-    residue = std::string(
-        static_cast<char *>(text_blob_->getMemoryMutable()) + text_len,
-        written_ - text_len);
-  }
+  if (!process_escape_sequences_) {
+    // Simply copy until field_terminator or '\n'.
+    for (;; ++cur_ptr) {
+      const char c = *cur_ptr;
+      if (c == field_terminator_) {
+        *has_reached_end_of_line = false;
+        break;
+      } else if (c == '\n') {
+        *has_reached_end_of_line = true;
+        break;
+      }
 
-  // Notify the operator for the split-up blob.
-  serialization::TextBlob proto;
-  proto.set_blob_id(text_blob_id_);
-  proto.set_size(text_len);
-
-  const std::size_t payload_size = proto.ByteSize();
-  // NOTE(zuyu): 'payload' gets released by FeedbackMessage's destructor.
-  char *payload = static_cast<char *>(std::malloc(payload_size));
-  CHECK(proto.SerializeToArray(payload, payload_size));
-
-  const tmb::client_id worker_thread_client_id = ClientIDMap::Instance()->getValue();
-  FeedbackMessage feedback_msg(TextScanOperator::kNewTextBlobMessage,
-                               operator_index_,
-                               payload,
-                               payload_size);
-  SendFeedbackMessage(bus_, worker_thread_client_id, scheduler_client_id_, feedback_msg);
-
-  // Notify Foreman for the avaiable work order on the blob.
-  serialization::WorkOrdersAvailableMessage message_proto;
-  message_proto.set_operator_index(operator_index_);
-
-  // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
-  const size_t message_proto_length = message_proto.ByteSize();
-  char *message_proto_bytes = static_cast<char*>(std::malloc(message_proto_length));
-  CHECK(message_proto.SerializeToArray(message_proto_bytes, message_proto_length));
-
-  tmb::TaggedMessage tagged_message(static_cast<const void *>(message_proto_bytes),
-                                    message_proto_length,
-                                    kWorkOrdersAvailableMessage);
-  std::free(message_proto_bytes);
-
-  // Send new work order available message to Foreman.
-  const tmb::MessageBus::SendStatus send_status =
-      QueryExecutionUtil::SendTMBMessage(
-          bus_,
-          worker_thread_client_id,
-          scheduler_client_id_,
-          std::move(tagged_message));
-  CHECK(send_status == tmb::MessageBus::SendStatus::kOK) << "Message could not "
-      "be sent from thread with TMB client ID "
-      << worker_thread_client_id << " to Foreman with TMB client "
-      "ID " << scheduler_client_id_;
-
-  if (residue.size()) {
-    // Allocate new blob, and copy residual bytes from last blob.
-    allocateBlob();
-    std::memcpy(writeableBlobAddress(), residue.data(), residue.size());
-    written_ += residue.size();
+      // Ignore '\r'
+      if (c != '\r') {
+        field_string->push_back(c);
+      }
+    }
+  } else {
+    for (;; ++cur_ptr) {
+      const char c = *cur_ptr;
+      if (c == '\\') {
+        ++cur_ptr;
+        const char first_escaped_character = *cur_ptr;
+        switch (first_escaped_character) {
+          case '0':  // Fallthrough for octal digits.
+          case '1':
+          case '2':
+          case '3':
+          case '4':
+          case '5':
+          case '6':
+          case '7':
+            field_string->push_back(ParseOctalLiteral(&cur_ptr));
+            break;
+          case 'N': {
+            // Null literal after some other column data.
+            throw TextScanFormatError(
+                "Null indicator '\\N' encountered in text scan mixed in with "
+                "other column data.");
+          }
+          case '\\':
+            // Backslash.
+            field_string->push_back('\\');
+            break;
+          case 'b':
+            // Backspace.
+            field_string->push_back('\b');
+            break;
+          case 'f':
+            // Form-feed.
+            field_string->push_back('\f');
+            break;
+          case 'n':
+            // Newline.
+            field_string->push_back('\n');
+            break;
+          case 'r':
+            // Carriage return.
+            field_string->push_back('\r');
+            break;
+          case 't':
+            // Tab.
+            field_string->push_back('\t');
+            break;
+          case 'v':
+            // Vertical tab.
+            field_string->push_back('\v');
+            break;
+          case 'x':
+            if (std::isxdigit(cur_ptr[1])) {
+              // Hexidecimal char literal.
+              ++cur_ptr;
+              field_string->push_back(ParseHexLiteral(&cur_ptr));
+            } else {
+              // Just an escaped 'x' with no hex digits.
+              field_string->push_back('x');
+            }
+            break;
+          case '\n':
+            throw TextScanFormatError(
+                "Backslash line splicing is not supported.");
+          default:
+            // Append escaped character as-is.
+            field_string->push_back(first_escaped_character);
+            break;
+        }
+      } else if (c == field_terminator_) {
+        *has_reached_end_of_line = false;
+        break;
+      } else if (c == '\n') {
+        *has_reached_end_of_line = true;
+        break;
+      } else {
+        if (c != '\r') {
+          // Ignore '\r'
+          field_string->push_back(c);
+        }
+      }
+    }
   }
+  *field_ptr = cur_ptr + 1;
 }
 
 }  // namespace quickstep

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/TextScanOperator.hpp
----------------------------------------------------------------------
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index 3cda65b..d73e7dd 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015-2016 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin\u2014Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -18,26 +20,18 @@
 #ifndef QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
 #define QUICKSTEP_RELATIONAL_OPERATORS_TEXT_SCAN_OPERATOR_HPP_
 
-#include <atomic>
+#include <cctype>
 #include <cstddef>
-#include <cstdint>
-#include <cstdio>
 #include <exception>
 #include <string>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
-#include "query_execution/QueryExecutionTypedefs.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
-#include "storage/StorageBlob.hpp"
-#include "storage/StorageBlockInfo.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/Macros.hpp"
-#include "utility/ThreadSafeQueue.hpp"
-
-#include "glog/logging.h"
 
 #include "tmb/id_typedefs.h"
 
@@ -98,26 +92,11 @@ class TextScanFormatError : public std::exception {
 };
 
 /**
- * @brief A structure for text data blobs.
- */
-struct TextBlob {
-  TextBlob(const block_id text_blob_id, const std::size_t text_size)
-      : blob_id(text_blob_id), size(text_size) {}
-  block_id blob_id;
-  std::size_t size;
-};
-
-/**
  * @brief An operator which reads tuples from a text file and inserts them into
  *        a relation.
  **/
 class TextScanOperator : public RelationalOperator {
  public:
-  enum FeedbackMessageType : WorkOrder::FeedbackMessageType {
-    kNewTextBlobMessage,
-    kSplitWorkOrderCompletionMessage,
-  };
-
   /**
    * @brief Constructor
    *
@@ -130,29 +109,22 @@ class TextScanOperator : public RelationalOperator {
    *        the text file.
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
-   * @param parallelize_load Parallelize the load process by th spliting file
-   *        into blobs, and generating separate work-orders for each of them.
    * @param output_relation The output relation.
    * @param output_destination_index The index of the InsertDestination in the
    *        QueryContext to insert tuples.
    **/
-  TextScanOperator(
-      const std::size_t query_id,
-      const std::string &file_pattern,
-      const char field_terminator,
-      const bool process_escape_sequences,
-      const bool parallelize_load,
-      const CatalogRelation &output_relation,
-      const QueryContext::insert_destination_id output_destination_index)
+  TextScanOperator(const std::size_t query_id,
+                   const std::string &file_pattern,
+                   const char field_terminator,
+                   const bool process_escape_sequences,
+                   const CatalogRelation &output_relation,
+                   const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
         file_pattern_(file_pattern),
         field_terminator_(field_terminator),
         process_escape_sequences_(process_escape_sequences),
-        parallelize_load_(parallelize_load),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
-        num_done_split_work_orders_(0),
-        num_split_work_orders_(0),
         work_generated_(false) {}
 
   ~TextScanOperator() override {}
@@ -171,23 +143,14 @@ class TextScanOperator : public RelationalOperator {
     return output_relation_.getID();
   }
 
-  void receiveFeedbackMessage(const WorkOrder::FeedbackMessage &msg) override;
-
  private:
   const std::string file_pattern_;
   const char field_terminator_;
   const bool process_escape_sequences_;
-  const bool parallelize_load_;
 
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
 
-  ThreadSafeQueue<TextBlob> text_blob_queue_;
-  std::atomic<std::uint32_t> num_done_split_work_orders_;
-  std::uint32_t num_split_work_orders_;
-
-  // Indicates if work order to load file is generated for non-parallel load, and
-  // if work order to split file to blobs is generated for parallel load.
   bool work_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(TextScanOperator);
@@ -203,7 +166,9 @@ class TextScanWorkOrder : public WorkOrder {
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param filename The name of the text file to bulk insert.
-   * @param field_terminator The string which separates attribute values in
+   * @param text_offset The start position in the text file to start text scan.
+   * @param text_segment_size The size of text segment to be scanned.
+   * @param field_terminator The character which separates attribute values in
    *        the text file.
    * @param process_escape_sequences Whether to decode escape sequences in the
    *        text file.
@@ -213,28 +178,8 @@ class TextScanWorkOrder : public WorkOrder {
   TextScanWorkOrder(
       const std::size_t query_id,
       const std::string &filename,
-      const char field_terminator,
-      const bool process_escape_sequences,
-      InsertDestination *output_destination,
-      StorageManager *storage_manager);
-
-  /**
-   * @brief Constructor.
-   *
-   * @param query_id The ID of the query to which this WorkOrder belongs.
-   * @param text_blob Blob ID containing the data to be scanned.
-   * @param text_size Size of the data in the blob.
-   * @param field_terminator The character which separates attribute values in
-   *        the text file.
-   * @param process_escape_sequences Whether to decode escape sequences in the
-   *        text file.
-   * @param output_destination The InsertDestination to write the read tuples.
-   * @param storage_manager The StorageManager to use.
-   */
-  TextScanWorkOrder(
-      const std::size_t query_id,
-      const block_id text_blob,
-      const std::size_t text_size,
+      const std::size_t text_offset,
+      const std::size_t text_segment_size,
       const char field_terminator,
       const bool process_escape_sequences,
       InsertDestination *output_destination,
@@ -255,141 +200,106 @@ class TextScanWorkOrder : public WorkOrder {
   void execute() override;
 
  private:
-  // Parse up to three octal digits (0-7) starting at '*start_pos' in
-  // 'row_string' as a char literal. '*start_pos' will be modified to
-  // the first position AFTER the parsed octal digits.
-  static char ParseOctalLiteral(const std::string &row_string,
-                                std::size_t *start_pos);
-
-  // Parse up to two hexadecimal digits (0-F, case insensitive) starting at
-  // '*start_pos' in 'row_string' as a char literal. '*start_pos' will be
-  // modified to the first position AFTER the parsed hexadecimal digits.
-  static char ParseHexLiteral(const std::string &row_string,
-                              std::size_t *start_pos);
-
-  // Read the next text row from the open FILE stream '*file' into
-  // '*row_string'. Returns false if end-of-file is reached and there are no
-  // more rows, true if a row string was successfully read. For ease of
-  // parsing, '*row_string' has the trailing row-terminator removed and
-  // replaced with a field-terminator.
-  bool readRowFromFile(FILE *file, std::string *row_string) const;
-
-  // Read the next text from blob memory starting at '**start_pos' and ending
-  // at '*end_pos' into '*row_string'. Returns false if the end of the blob is
-  // reached and there are no more rows, true if a row was successfully read.
-  // For ease of parsing, '*row_string' has the trailing row-terminator removed
-  // and replaced with a field-terminator. After call '*start_pos' points to
-  // first character AFTER the read row in the blob.
-  bool readRowFromBlob(const char **start_pos,
-                       const char *end_pos,
-                       std::string *row_string) const;
-
-  // Trim a row-terminator (newline or carriage-return + newline) off the end
-  // of '*row_string'. Returns true if the row-terminator was successfully
-  // removed, false if '*row_string' did not end in a row-terminator.
-  bool removeRowTerminator(std::string *row_string) const;
-
-  // Extract a field string starting at '*start_pos' in 'row_string' into
-  // '*field_string'. This method also expands escape sequences if
-  // 'process_escape_sequences_' is true. Returns true if a field string was
-  // successfully extracted, false in the special case where the NULL-literal
-  // string "\N" was found. Throws TextScanFormatError if text was malformed.
-  bool extractFieldString(const std::string &row_string,
-                          std::size_t *start_pos,
-                          std::string *field_string) const;
-
-  // Make a tuple by parsing all of the individual fields specified in
-  // 'row_string'.
-  Tuple parseRow(const std::string &row_string, const CatalogRelationSchema &relation) const;
-
-  const bool is_file_;
-  const std::string filename_;
-  const char field_terminator_;
-  const block_id text_blob_;
-  const std::size_t text_size_;
-  const bool process_escape_sequences_;
-
-  InsertDestination *output_destination_;
-  StorageManager *storage_manager_;
-
-  DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
-};
-
-/**
- * @brief A WorkOrder to split the file into blobs of text that can be processed
- * separately.
- **/
-class TextSplitWorkOrder : public WorkOrder {
- public:
   /**
-   * @brief Constructor.
+   * @brief Extract a field string starting at \p *field_ptr. This method also
+   *        expands escape sequences if \p process_escape_sequences_ is true.
+   *        Throws TextScanFormatError if text was malformed.
    *
-   * @param query_id The ID of the query to which this WorkOrder belongs.
-   * @param filename File to split into row-aligned blobs.
-   * @param process_escape_sequences Whether to decode escape sequences in the
-   *        text file.
-   * @param storage_manager The StorageManager to use.
-   * @param operator_index Operator index of the current operator. This is used
-   *                       to send new-work available message to Foreman.
-   * @param scheduler_client_id The TMB client ID of the scheduler thread.
-   * @param bus A pointer to the TMB.
+   * @param field_ptr \p *field_ptr points to the current position of the input
+   *        char stream for parsing. The overall char stream must end with a
+   *        newline character. After the call, \p *field_ptr will be modified to
+   *        the start position of the NEXT field string.
+   * @param is_null_literal OUTPUT parameter. Set to true if the NULL-literal
+   *        string "\N" was found.
+   * @param has_reached_end_of_line OUTPUT parameter. Set to true if the newline
+   *        character was encountered.
+   * @param field_string OUTPUT parameter. Set to the extracted field string.
    */
-  TextSplitWorkOrder(const std::size_t query_id,
-                     const std::string &filename,
-                     const bool process_escape_sequences,
-                     StorageManager *storage_manager,
-                     const std::size_t operator_index,
-                     const tmb::client_id scheduler_client_id,
-                     MessageBus *bus)
-      : WorkOrder(query_id),
-        filename_(filename),
-        process_escape_sequences_(process_escape_sequences),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)),
-        operator_index_(operator_index),
-        scheduler_client_id_(scheduler_client_id),
-        bus_(DCHECK_NOTNULL(bus)) {}
+  void extractFieldString(const char **field_ptr,
+                          bool *is_null_literal,
+                          bool *has_reached_end_of_line,
+                          std::string *field_string) const;
 
   /**
-   * @exception TextScanReadError The text file could not be opened for
-   *            reading.
+   * @brief Make a tuple by parsing all of the individual fields from a char stream.
+   *
+   * @param \p *row_ptr points to the current position of the input char stream
+   *        for parsing. The overall char stream must end with a newline character.
+   *        After the call, \p *row_ptr will be modified to the start position of
+   *        the NEXT text row.
+   * @param relation The relation schema for the tuple.
+   * @return The tuple parsed from the char stream.
    */
-  void execute() override;
-
- private:
-  // Allocate a new blob.
-  void allocateBlob();
-
-  // Find the last row terminator in current blob.
-  std::size_t findLastRowTerminator();
+  Tuple parseRow(const char **row_ptr,
+                 const CatalogRelationSchema &relation) const;
 
-  // Send the blob info to its operator via TMB.
-  void sendBlobInfoToOperator(const bool write_row_aligned);
 
-  // Get the writeable address (unwritten chunk) in current blob.
-  inline char* writeableBlobAddress() {
-    return static_cast<char*>(text_blob_->getMemoryMutable()) + written_;
+  /**
+   * @brief Parse up to three octal digits (0-7) starting at \p *literal_ptr as
+   *        a char literal. \p *literal_ptr will be modified to the last position
+   *        of the parsed octal digits.
+   *
+   * @param literal_ptr \p *literal_ptr points to the current position of the
+   *        input char stream for parsing. The overall char stream must end with
+   *        a newline character.
+   * @return The char literal from the parsed octal digits.
+   */
+  inline static char ParseOctalLiteral(const char **literal_ptr) {
+    int value = 0;
+    const char *ptr = *literal_ptr;
+    for (int i = 0; i < 3; ++i, ++ptr) {
+      const int char_value = *ptr - '0';
+      if ((char_value >= 0) && (char_value < 8)) {
+        value = value * 8 + char_value;
+      } else {
+        break;
+      }
+    }
+    *literal_ptr = ptr - 1;
+    return value;
   }
 
-  // Number of bytes remaining to be written.
-  inline std::size_t remainingBlobBytes() const {
-    return blob_size_ - written_;
+  /**
+   * @brief Parse up to two hexadecimal digits (0-F, case insensitive) starting
+   *        at \p *literal_ptr as a char literal. \p *literal_ptr will be modified
+   *        to the last position of the parsed octal digits.
+   *
+   * @param literal_ptr \p *literal_ptr points to the current position of the
+   *        input char stream for parsing. The overall char stream must end with
+   *        a newline character.
+   * @return The char literal from the parsed hexadecimal digits.
+   */
+  inline static char ParseHexLiteral(const char **literal_ptr) {
+    int value = 0;
+    const char *ptr = *literal_ptr;
+    for (int i = 0; i < 2; ++i, ++ptr) {
+      const char c = *ptr;
+      int char_value;
+      if (std::isdigit(c)) {
+        char_value = c - '0';
+      } else if (c >= 'a' && c <= 'f') {
+        char_value = c - 'a' + 10;
+      } else if (c >= 'A' && c <= 'F') {
+        char_value = c - 'A' + 10;
+      } else {
+        break;
+      }
+      value = value * 16 + char_value;
+    }
+    *literal_ptr = ptr - 1;
+    return value;
   }
 
-  const std::string filename_;  // File to split.
+  const std::string filename_;
+  const std::size_t text_offset_;
+  const std::size_t text_segment_size_;
+  const char field_terminator_;
   const bool process_escape_sequences_;
 
+  InsertDestination *output_destination_;
   StorageManager *storage_manager_;
 
-  const std::size_t operator_index_;  // Opeartor index.
-  const tmb::client_id scheduler_client_id_;  // The scheduler's TMB client ID.
-  MessageBus *bus_;
-
-  MutableBlobReference text_blob_;  // Mutable reference to current blob.
-  block_id text_blob_id_;  // Current blob ID.
-  std::size_t written_ = 0;  // Bytes written in current blob.
-  std::size_t blob_size_ = 0;  // Size of the current blob.
-
-  DISALLOW_COPY_AND_ASSIGN(TextSplitWorkOrder);
+  DISALLOW_COPY_AND_ASSIGN(TextScanWorkOrder);
 };
 
 /** @} */

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/WorkOrder.proto
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index fd731f7..60d4c8f 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -1,5 +1,7 @@
 //   Copyright 2011-2015 Quickstep Technologies LLC.
 //   Copyright 2015-2016 Pivotal Software, Inc.
+//   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+//     University of Wisconsin\u2014Madison.
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -18,7 +20,6 @@ syntax = "proto2";
 package quickstep.serialization;
 
 import "relational_operators/SortMergeRunOperator.proto";
-import "relational_operators/TextScanOperator.proto";
 
 enum WorkOrderType {
   AGGREGATION = 1;
@@ -39,8 +40,7 @@ enum WorkOrderType {
   SORT_RUN_GENERATION = 16;
   TABLE_GENERATOR = 17;
   TEXT_SCAN = 18;
-  TEXT_SPLIT = 19;
-  UPDATE = 20;
+  UPDATE = 19;
 }
 
 message WorkOrder {
@@ -223,15 +223,12 @@ message TableGeneratorWorkOrder {
 message TextScanWorkOrder {
   extend WorkOrder {
     // All required.
+    optional string filename = 301;
+    optional uint64 text_offset = 302;
+    optional uint64 text_segment_size = 303;
     optional uint32 field_terminator = 304;  // For one-byte char.
     optional bool process_escape_sequences = 305;
     optional int32 insert_destination_index = 306;
-
-    // Either
-    optional string filename = 307;
-
-    // Or
-    optional TextBlob text_blob = 308;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/WorkOrderFactory.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 489b666..da42b4d 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -42,7 +42,6 @@
 #include "relational_operators/SortRunGenerationOperator.hpp"
 #include "relational_operators/TableGeneratorOperator.hpp"
 #include "relational_operators/TextScanOperator.hpp"
-#include "relational_operators/TextScanOperator.pb.h"
 #include "relational_operators/UpdateOperator.hpp"
 #include "relational_operators/WorkOrder.pb.h"
 #include "storage/StorageBlockInfo.hpp"
@@ -389,40 +388,16 @@ WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder
     }
     case serialization::TEXT_SCAN: {
       LOG(INFO) << "Creating TextScanWorkOrder";
-      if (proto.HasExtension(serialization::TextScanWorkOrder::filename)) {
-        return new TextScanWorkOrder(
-            proto.query_id(),
-            proto.GetExtension(serialization::TextScanWorkOrder::filename),
-            proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
-            proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
-            query_context->getInsertDestination(
-                proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
-            storage_manager);
-      } else {
-        const serialization::TextBlob &text_blob_proto =
-            proto.GetExtension(serialization::TextScanWorkOrder::text_blob);
-
-        return new TextScanWorkOrder(
-            proto.query_id(),
-            text_blob_proto.blob_id(),
-            text_blob_proto.size(),
-            proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
-            proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
-            query_context->getInsertDestination(
-                proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
-            storage_manager);
-      }
-    }
-    case serialization::TEXT_SPLIT: {
-      LOG(INFO) << "Creating TextSplitWorkOrder";
-      return new TextSplitWorkOrder(
+      return new TextScanWorkOrder(
           proto.query_id(),
-          proto.GetExtension(serialization::TextSplitWorkOrder::filename),
-          proto.GetExtension(serialization::TextSplitWorkOrder::process_escape_sequences),
-          storage_manager,
-          proto.GetExtension(serialization::TextSplitWorkOrder::operator_index),
-          shiftboss_client_id,
-          bus);
+          proto.GetExtension(serialization::TextScanWorkOrder::filename),
+          proto.GetExtension(serialization::TextScanWorkOrder::text_offset),
+          proto.GetExtension(serialization::TextScanWorkOrder::text_segment_size),
+          proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
+          proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+          storage_manager);
     }
     case serialization::UPDATE: {
       LOG(INFO) << "Creating UpdateWorkOrder";
@@ -691,27 +666,14 @@ bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                  proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index));
     }
     case serialization::TEXT_SCAN: {
-      if (!proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) ||
-          !proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) ||
-          !proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) ||
-          !query_context.isValidInsertDestinationId(
-              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))) {
-        return false;
-      }
-
-      // Two fields are exclusive.
-      if (proto.HasExtension(serialization::TextScanWorkOrder::filename) ==
-              proto.HasExtension(serialization::TextScanWorkOrder::text_blob)) {
-        return false;
-      }
-
-      return proto.HasExtension(serialization::TextScanWorkOrder::filename) ||
-             proto.GetExtension(serialization::TextScanWorkOrder::text_blob).IsInitialized();
-    }
-    case serialization::TEXT_SPLIT: {
-      return proto.HasExtension(serialization::TextSplitWorkOrder::filename) &&
-             proto.HasExtension(serialization::TextSplitWorkOrder::process_escape_sequences) &&
-             proto.HasExtension(serialization::TextSplitWorkOrder::operator_index);
+      return proto.HasExtension(serialization::TextScanWorkOrder::filename) &&
+             proto.HasExtension(serialization::TextScanWorkOrder::text_offset) &&
+             proto.HasExtension(serialization::TextScanWorkOrder::text_segment_size) &&
+             proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) &&
+             proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) &&
+             proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index));
     }
     case serialization::UPDATE: {
       return proto.HasExtension(serialization::UpdateWorkOrder::relation_id) &&

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/tests/TextScanOperator_unittest.cpp
----------------------------------------------------------------------
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index ef6fc2d..5860745 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -193,7 +193,6 @@ TEST_F(TextScanOperatorTest, ScanTest) {
                            input_filename,
                            '\t',
                            true,
-                           false,
                            *relation_,
                            output_destination_index));
 

http://git-wip-us.apache.org/repos/asf/incubator-quickstep/blob/4f8fdbe8/relational_operators/tests/text_scan_input.txt
----------------------------------------------------------------------
diff --git a/relational_operators/tests/text_scan_input.txt b/relational_operators/tests/text_scan_input.txt
index bcb76bf..51015bd 100644
--- a/relational_operators/tests/text_scan_input.txt
+++ b/relational_operators/tests/text_scan_input.txt
@@ -2,9 +2,5 @@
 -1234567890	-1.2e-200	A twenty char string	1969-07-21 02:56:00	00:00:01.001	Another twenty chars
 \N	\N	\N	\N	\N	\N
 \N	\N	\\N	\N	\N	\\N
-\x34\062	\55\064\x32\56\65	\x7B\
-\t\	\\\e\s\c\a\p\e\d\x\b\n\x7d	1988-07-16\T00:00\:00\x2E0\x30\60\06001	00:00:00	'good\' \"bye"\r\n\
-\r\n\v\n\
-
-0	0.0	\\\\\
-\\\\\n	1970-01-01	0 s	\\\\
+\x34\062	\55\064\x32\56\65	\x7B\n\t\	\\\e\s\c\a\p\e\d\x\b\n\x7d	1988-07-16\T00:00\:00\x2E0\x30\60\06001	00:00:00	'good\' \"bye"\r\n\n\r\n\v\n\n
+0	0.0	\\\\\n\\\\\n	1970-01-01	0 s	\\\\