You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2018/11/05 16:24:40 UTC

[arrow] branch master updated: ARROW-3666: [C++] Improve C++ parser performance

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 67a436a  ARROW-3666: [C++] Improve C++ parser performance
67a436a is described below

commit 67a436aed11cc0b08da4a4f310dacc380e29de0d
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Nov 5 17:24:22 2018 +0100

    ARROW-3666: [C++] Improve C++ parser performance
    
    Make CSV parsing around 30% faster.
    
    Benchmark of reading a CSV file with integer columns. Before:
    * multi-threaded: 1.2 GB/s
    * single-threaded: 190 MB/s
    
    After:
    * multi-threaded: 1.6 GB/s
    * single-threaded: 210 MB/s
    
    (on a 8-core 16-thread AMD Ryzen CPU)
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #2886 from pitrou/ARROW-3666-faster-csv-parser and squashes the following commits:
    
    1c60c54b <Antoine Pitrou> Address review comments
    91216a35 <Antoine Pitrou> ARROW-3666:  Improve C++ parser performance
---
 cpp/src/arrow/csv/parser.cc | 403 +++++++++++++++++++++++++++++++++-----------
 cpp/src/arrow/csv/parser.h  |  79 +++++----
 cpp/src/arrow/csv/reader.cc |  11 +-
 3 files changed, 357 insertions(+), 136 deletions(-)

diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc
index 240b1f8..ea602df 100644
--- a/cpp/src/arrow/csv/parser.cc
+++ b/cpp/src/arrow/csv/parser.cc
@@ -17,9 +17,12 @@
 
 #include "arrow/csv/parser.h"
 
+#include <algorithm>
 #include <cstdio>
 #include <sstream>
+#include <utility>
 
+#include "arrow/memory_pool.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
 
@@ -38,109 +41,228 @@ static Status MismatchingColumns(int32_t expected, int32_t actual) {
   return ParseError(s);
 }
 
-BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
-    : options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
+template <bool Quoting, bool Escaping>
+class SpecializedOptions {
+ public:
+  static constexpr bool quoting = Quoting;
+  static constexpr bool escaping = Escaping;
+};
+
+// A helper class allocating the buffer for parsed values and writing into it
+// without any further resizes, except at the end.
+class BlockParser::PresizedParsedWriter {
+ public:
+  PresizedParsedWriter(MemoryPool* pool, uint32_t size)
+      : parsed_size_(0), parsed_capacity_(size) {
+    ARROW_CHECK_OK(AllocateResizableBuffer(pool, parsed_capacity_, &parsed_buffer_));
+    parsed_ = parsed_buffer_->mutable_data();
+  }
+
+  void Finish(std::shared_ptr<Buffer>* out_parsed) {
+    ARROW_CHECK_OK(parsed_buffer_->Resize(parsed_size_));
+    *out_parsed = parsed_buffer_;
+  }
+
+  void BeginLine() { saved_parsed_size_ = parsed_size_; }
+
+  void PushFieldChar(char c) {
+    DCHECK_LT(parsed_size_, parsed_capacity_);
+    parsed_[parsed_size_++] = static_cast<uint8_t>(c);
+  }
+
+  // Rollback the state that was saved in BeginLine()
+  void RollbackLine() { parsed_size_ = saved_parsed_size_; }
+
+  int64_t size() { return parsed_size_; }
+
+  std::shared_ptr<ResizableBuffer> parsed_buffer_;
+  uint8_t* parsed_;
+  int64_t parsed_size_;
+  int64_t parsed_capacity_;
+  // Checkpointing, for when an incomplete line is encountered at end of block
+  int64_t saved_parsed_size_;
+};
+
+// A helper class handling a growable buffer for values offsets.  This class is
+// used when the number of columns is not yet known and we therefore cannot
+// efficiently presize the target area for a given number of rows.
+class BlockParser::ResizableValuesWriter {
+ public:
+  explicit ResizableValuesWriter(MemoryPool* pool)
+      : values_size_(0), values_capacity_(256) {
+    ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
+                                           &values_buffer_));
+    values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
+  }
+
+  template <typename ParsedWriter>
+  void Start(ParsedWriter& parsed_writer) {
+    PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
+  }
+
+  void Finish(std::shared_ptr<Buffer>* out_values) {
+    ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
+    *out_values = values_buffer_;
+  }
+
+  void BeginLine() { saved_values_size_ = values_size_; }
+
+  void StartField(bool quoted) { quoted_ = quoted; }
+
+  template <typename ParsedWriter>
+  void FinishField(ParsedWriter* parsed_writer) {
+    PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
+  }
+
+  // Rollback the state that was saved in BeginLine()
+  void RollbackLine() { values_size_ = saved_values_size_; }
+
+ protected:
+  void PushValue(ValueDesc v) {
+    if (ARROW_PREDICT_FALSE(values_size_ == values_capacity_)) {
+      values_capacity_ = values_capacity_ * 2;
+      ARROW_CHECK_OK(values_buffer_->Resize(values_capacity_ * sizeof(*values_)));
+      values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
+    }
+    values_[values_size_++] = v;
+  }
+
+  std::shared_ptr<ResizableBuffer> values_buffer_;
+  ValueDesc* values_;
+  int64_t values_size_;
+  int64_t values_capacity_;
+  bool quoted_;
+  // Checkpointing, for when an incomplete line is encountered at end of block
+  int64_t saved_values_size_;
+};
 
-Status BlockParser::ParseLine(const char* data, const char* data_end, bool is_final,
+// A helper class allocating the buffer for values offsets and writing into it
+// without any further resizes, except at the end.  This class is used once the
+// number of columns is known, as it eliminates resizes and generates simpler,
+// faster CSV parsing code.
+class BlockParser::PresizedValuesWriter {
+ public:
+  PresizedValuesWriter(MemoryPool* pool, int32_t num_rows, int32_t num_cols)
+      : values_size_(0), values_capacity_(1 + num_rows * num_cols) {
+    ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
+                                           &values_buffer_));
+    values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
+  }
+
+  template <typename ParsedWriter>
+  void Start(ParsedWriter& parsed_writer) {
+    PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
+  }
+
+  void Finish(std::shared_ptr<Buffer>* out_values) {
+    ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
+    *out_values = values_buffer_;
+  }
+
+  void BeginLine() { saved_values_size_ = values_size_; }
+
+  void StartField(bool quoted) { quoted_ = quoted; }
+
+  template <typename ParsedWriter>
+  void FinishField(ParsedWriter* parsed_writer) {
+    PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
+  }
+
+  // Rollback the state that was saved in BeginLine()
+  void RollbackLine() { values_size_ = saved_values_size_; }
+
+ protected:
+  void PushValue(ValueDesc v) {
+    DCHECK_LT(values_size_, values_capacity_);
+    values_[values_size_++] = v;
+  }
+
+  std::shared_ptr<ResizableBuffer> values_buffer_;
+  ValueDesc* values_;
+  int64_t values_size_;
+  const int64_t values_capacity_;
+  bool quoted_;
+  // Checkpointing, for when an incomplete line is encountered at end of block
+  int64_t saved_values_size_;
+};
+
+template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+Status BlockParser::ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+                              const char* data, const char* data_end, bool is_final,
                               const char** out_data) {
   int32_t num_cols = 0;
   char c;
-#ifdef CSV_PARSER_USE_BITFIELD
-  bool quoted;
-#endif
 
-  auto saved_parsed_size = parsed_.size();
-#ifdef CSV_PARSER_USE_BITFIELD
-  auto saved_values_size = values_.size();
-#else
-  auto saved_offsets_size = offsets_.size();
-  auto saved_quoted_size = quoted_.size();
-#endif
+  values_writer->BeginLine();
+  parsed_writer->BeginLine();
 
-  // Subroutines to manage parser state
-  auto InitField = [&]() {};
-  auto PushFieldChar = [&](char c) { parsed_.push_back(static_cast<uint8_t>(c)); };
-  auto FinishField = [&]() {
-#ifdef CSV_PARSER_USE_BITFIELD
-    ValueDesc v = {static_cast<uint32_t>(parsed_.size()) & 0x7fffffffU, quoted};
-    values_.push_back(v);
-#else
-    offsets_.push_back(parsed_.size());
-#endif
-    ++num_cols;
-  };
-  auto RewindState = [&]() {
-    parsed_.resize(saved_parsed_size);
-#ifdef CSV_PARSER_USE_BITFIELD
-    values_.resize(saved_values_size);
-#else
-    offsets_.resize(saved_offsets_size);
-    quoted_.resize(saved_quoted_size);
-#endif
-  };
+  auto FinishField = [&]() { values_writer->FinishField(parsed_writer); };
+
+  DCHECK_GT(data_end, data);
 
   // The parsing state machine
 
 FieldStart:
   // At the start of a field
-  InitField();
   // Quoting is only recognized at start of field
-  if (options_.quoting && data != data_end && *data == options_.quote_char) {
+  if (SpecializedOptions::quoting && ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
     ++data;
-#ifdef CSV_PARSER_USE_BITFIELD
-    quoted = true;
-#else
-    quoted_.push_back(true);
-#endif
+    values_writer->StartField(true /* quoted */);
     goto InQuotedField;
   } else {
-#ifdef CSV_PARSER_USE_BITFIELD
-    quoted = false;
-#else
-    quoted_.push_back(false);
-#endif
+    values_writer->StartField(false /* quoted */);
     goto InField;
   }
 
 InField:
   // Inside a non-quoted part of a field
-  if (data == data_end) {
+  if (ARROW_PREDICT_FALSE(data == data_end)) {
     goto AbortLine;
   }
   c = *data++;
-  if (options_.escaping && c == options_.escape_char) {
-    if (data == data_end) {
+  if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
+    if (ARROW_PREDICT_FALSE(data == data_end)) {
       goto AbortLine;
     }
     c = *data++;
-    PushFieldChar(c);
+    parsed_writer->PushFieldChar(c);
     goto InField;
   }
-  if (c == '\n' || c == '\r') {
-    goto LineEnd;
-  }
-  if (c == options_.delimiter) {
+  if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
     goto FieldEnd;
   }
-  PushFieldChar(c);
+  if (ARROW_PREDICT_FALSE(c < ' ')) {
+    if (c == '\r') {
+      // In the middle of a newline separator?
+      if (ARROW_PREDICT_TRUE(data < data_end) && *data == '\n') {
+        data++;
+      }
+      goto LineEnd;
+    }
+    if (c == '\n') {
+      goto LineEnd;
+    }
+  }
+  parsed_writer->PushFieldChar(c);
   goto InField;
 
 InQuotedField:
   // Inside a quoted part of a field
-  if (data == data_end) {
+  if (ARROW_PREDICT_FALSE(data == data_end)) {
     goto AbortLine;
   }
   c = *data++;
-  if (options_.escaping && c == options_.escape_char) {
-    if (data == data_end) {
+  if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
+    if (ARROW_PREDICT_FALSE(data == data_end)) {
       goto AbortLine;
     }
     c = *data++;
-    PushFieldChar(c);
+    parsed_writer->PushFieldChar(c);
     goto InQuotedField;
   }
-  if (c == options_.quote_char) {
-    if (options_.double_quote && data < data_end && *data == options_.quote_char) {
+  if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
+    if (options_.double_quote && ARROW_PREDICT_TRUE(data < data_end) &&
+        ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
       // Double-quoting
       ++data;
     } else {
@@ -148,23 +270,25 @@ InQuotedField:
       goto InField;
     }
   }
-  PushFieldChar(c);
+  parsed_writer->PushFieldChar(c);
   goto InQuotedField;
 
 FieldEnd:
   // At the end of a field
   FinishField();
+  ++num_cols;
+  if (ARROW_PREDICT_FALSE(data == data_end)) {
+    goto AbortLine;
+  }
   goto FieldStart;
 
 LineEnd:
-  // At the end of line, possibly in the middle of the newline separator
+  // At the end of line
   FinishField();
-  if (data < data_end && data[-1] == '\r' && *data == '\n') {
-    data++;
-  }
-  if (num_cols_ == -1) {
+  ++num_cols;
+  if (ARROW_PREDICT_FALSE(num_cols_ == -1)) {
     num_cols_ = num_cols;
-  } else if (num_cols != num_cols_) {
+  } else if (ARROW_PREDICT_FALSE(num_cols != num_cols_)) {
     return MismatchingColumns(num_cols_, num_cols);
   }
   *out_data = data;
@@ -174,6 +298,7 @@ AbortLine:
   // Not a full line except perhaps if in final block
   if (is_final) {
     FinishField();
+    ++num_cols;
     if (num_cols_ == -1) {
       num_cols_ = num_cols;
     } else if (num_cols != num_cols_) {
@@ -183,52 +308,133 @@ AbortLine:
     return Status::OK();
   }
   // Truncated line at end of block, rewind parsed state
-  RewindState();
+  values_writer->RollbackLine();
+  parsed_writer->RollbackLine();
   return Status::OK();
 }
 
-Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final,
-                            uint32_t* out_size) {
-  num_rows_ = 0;
-  // These don't shrink the allocated capacity, so reuses of BlockParser
-  // avoid most allocations when appending data
-  parsed_.clear();
-#ifdef CSV_PARSER_USE_BITFIELD
-  values_.clear();
-  values_.push_back({0, false});
-#else
-  offsets_.clear();
-  offsets_.push_back(0);
-  quoted_.clear();
-#endif
-
-  const char* data = start;
-  const char* data_end = start + size;
-
-  while (data < data_end && num_rows_ < max_num_rows_) {
+template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+Status BlockParser::ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+                               const char* data, const char* data_end, bool is_final,
+                               int32_t rows_in_chunk, const char** out_data,
+                               bool* finished_parsing) {
+  while (data < data_end && rows_in_chunk > 0) {
     const char* line_end = data;
-    RETURN_NOT_OK(ParseLine(data, data_end, is_final, &line_end));
+    RETURN_NOT_OK(ParseLine<SpecializedOptions>(values_writer, parsed_writer, data,
+                                                data_end, is_final, &line_end));
     if (line_end == data) {
       // Cannot parse any further
+      *finished_parsing = true;
       break;
     }
     data = line_end;
     ++num_rows_;
+    --rows_in_chunk;
+  }
+  // Append new buffers and update size
+  std::shared_ptr<Buffer> values_buffer;
+  values_writer->Finish(&values_buffer);
+  if (values_buffer->size() > 0) {
+    values_size_ += static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc) - 1);
+    values_buffers_.push_back(std::move(values_buffer));
   }
+  *out_data = data;
+  return Status::OK();
+}
+
+template <typename SpecializedOptions>
+Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is_final,
+                                       uint32_t* out_size) {
+  num_rows_ = 0;
+  values_size_ = 0;
+  parsed_size_ = 0;
+  values_buffers_.clear();
+  parsed_buffer_.reset();
+  parsed_ = nullptr;
+
+  const char* data = start;
+  const char* data_end = start + size;
+  bool finished_parsing = false;
+
+  PresizedParsedWriter parsed_writer(pool_, size);
+
+  if (num_cols_ == -1) {
+    // Can't presize values when the number of columns is not known, first parse
+    // a single line
+    const int32_t rows_in_chunk = 1;
+    ResizableValuesWriter values_writer(pool_);
+    values_writer.Start(parsed_writer);
+
+    RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
+                                                 data_end, is_final, rows_in_chunk, &data,
+                                                 &finished_parsing));
+  }
+  while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) {
+    // We know the number of columns, so can presize a values array for
+    // a given number of rows
+    DCHECK_GE(num_cols_, 0);
+
+    int32_t rows_in_chunk;
+    if (num_cols_ > 0) {
+      rows_in_chunk = std::min(32768 / num_cols_, max_num_rows_ - num_rows_);
+    } else {
+      rows_in_chunk = std::min(32768, max_num_rows_ - num_rows_);
+    }
+
+    PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_);
+    values_writer.Start(parsed_writer);
+
+    RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
+                                                 data_end, is_final, rows_in_chunk, &data,
+                                                 &finished_parsing));
+  }
+
+  parsed_writer.Finish(&parsed_buffer_);
+  parsed_size_ = static_cast<int32_t>(parsed_buffer_->size());
+  parsed_ = parsed_buffer_->data();
+
+  DCHECK_EQ(values_size_, num_rows_ * num_cols_);
   if (num_cols_ == -1) {
     DCHECK_EQ(num_rows_, 0);
   }
-#ifdef CSV_PARSER_USE_BITFIELD
-  DCHECK_EQ(values_.size(), 1 + static_cast<size_t>(num_rows_ * num_cols_));
-  DCHECK_EQ(parsed_.size(), values_[values_.size() - 1].offset);
-#else
-  DCHECK_EQ(offsets_.size(), 1 + static_cast<size_t>(num_rows_ * num_cols_));
-  DCHECK_EQ(parsed_.size(), offsets_[offsets_.size() - 1]);
+#ifndef NDEBUG
+  if (num_rows_ > 0) {
+    DCHECK_GT(values_buffers_.size(), 0);
+    auto& last_values_buffer = values_buffers_.back();
+    auto last_values = reinterpret_cast<const ValueDesc*>(last_values_buffer->data());
+    auto last_values_size = last_values_buffer->size() / sizeof(ValueDesc);
+    auto check_parsed_size =
+        static_cast<int32_t>(last_values[last_values_size - 1].offset);
+    DCHECK_EQ(parsed_size_, check_parsed_size);
+  } else {
+    DCHECK_EQ(parsed_size_, 0);
+  }
 #endif
   *out_size = static_cast<uint32_t>(data - start);
   return Status::OK();
 }
 
+Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final,
+                            uint32_t* out_size) {
+  if (options_.quoting) {
+    if (options_.escaping) {
+      return DoParseSpecialized<SpecializedOptions<true, true>>(start, size, is_final,
+                                                                out_size);
+    } else {
+      return DoParseSpecialized<SpecializedOptions<true, false>>(start, size, is_final,
+                                                                 out_size);
+    }
+  } else {
+    if (options_.escaping) {
+      return DoParseSpecialized<SpecializedOptions<false, true>>(start, size, is_final,
+                                                                 out_size);
+    } else {
+      return DoParseSpecialized<SpecializedOptions<false, false>>(start, size, is_final,
+                                                                  out_size);
+    }
+  }
+}
+
 Status BlockParser::Parse(const char* data, uint32_t size, uint32_t* out_size) {
   return DoParse(data, size, false /* is_final */, out_size);
 }
@@ -237,5 +443,12 @@ Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_si
   return DoParse(data, size, true /* is_final */, out_size);
 }
 
+BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
+                         int32_t max_num_rows)
+    : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
+
+BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
+    : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
+
 }  // namespace csv
 }  // namespace arrow
diff --git a/cpp/src/arrow/csv/parser.h b/cpp/src/arrow/csv/parser.h
index 4529fce..0e1a415 100644
--- a/cpp/src/arrow/csv/parser.h
+++ b/cpp/src/arrow/csv/parser.h
@@ -22,21 +22,20 @@
 #include <memory>
 #include <vector>
 
+#include "arrow/buffer.h"
 #include "arrow/csv/options.h"
 #include "arrow/status.h"
 #include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
+
+class MemoryPool;
+
 namespace csv {
 
 constexpr int32_t kMaxParserNumRows = 100000;
 
-// Whether BlockParser will use bitfields for better memory consumption
-// and cache locality.
-#undef CSV_PARSER_USE_BITFIELD
-#define CSV_PARSER_USE_BITFIELD
-
 /// \class BlockParser
 /// \brief A reusable block-based parser for CSV data
 ///
@@ -53,6 +52,8 @@ class ARROW_EXPORT BlockParser {
  public:
   explicit BlockParser(ParseOptions options, int32_t num_cols = -1,
                        int32_t max_num_rows = kMaxParserNumRows);
+  explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1,
+                       int32_t max_num_rows = kMaxParserNumRows);
 
   /// \brief Parse a block of data
   ///
@@ -69,7 +70,7 @@ class ARROW_EXPORT BlockParser {
   int32_t num_rows() const { return num_rows_; }
   int32_t num_cols() const { return num_cols_; }
   /// \brief Return the total size in bytes of parsed data
-  int32_t num_bytes() const { return static_cast<int32_t>(parsed_.size()); }
+  uint32_t num_bytes() const { return parsed_size_; }
 
   /// \brief Visit parsed values in a column
   ///
@@ -77,36 +78,42 @@ class ARROW_EXPORT BlockParser {
   /// Status(const uint8_t* data, uint32_t size, bool quoted)
   template <typename Visitor>
   Status VisitColumn(int32_t col_index, Visitor&& visit) const {
-#ifdef CSV_PARSER_USE_BITFIELD
-    for (int32_t pos = col_index; pos < num_cols_ * num_rows_; pos += num_cols_) {
-      auto start = values_[pos].offset;
-      auto stop = values_[pos + 1].offset;
-      auto quoted = values_[pos + 1].quoted;
-      ARROW_RETURN_NOT_OK(visit(parsed_.data() + start, stop - start, quoted));
+    for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) {
+      const auto& values_buffer = values_buffers_[buf_index];
+      const auto values = reinterpret_cast<const ValueDesc*>(values_buffer->data());
+      const auto max_pos =
+          static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc)) - 1;
+      for (int32_t pos = col_index; pos < max_pos; pos += num_cols_) {
+        auto start = values[pos].offset;
+        auto stop = values[pos + 1].offset;
+        auto quoted = values[pos + 1].quoted;
+        ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted));
+      }
     }
-#else
-    for (int32_t pos = col_index; pos < num_cols_ * num_rows_; pos += num_cols_) {
-      auto start = offsets_[pos];
-      auto stop = offsets_[pos + 1];
-      auto quoted = quoted_[pos];
-      ARROW_RETURN_NOT_OK(visit(parsed_.data() + start, stop - start, quoted));
-    }
-#endif
     return Status::OK();
   }
 
-  // XXX add a ClearColumn method to signal that a column won't be visited anymore?
-
  protected:
   ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
 
   Status DoParse(const char* data, uint32_t size, bool is_final, uint32_t* out_size);
+  template <typename SpecializedOptions>
+  Status DoParseSpecialized(const char* data, uint32_t size, bool is_final,
+                            uint32_t* out_size);
+
+  template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+  Status ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+                    const char* data, const char* data_end, bool is_final,
+                    int32_t rows_in_chunk, const char** out_data, bool* finished_parsing);
 
   // Parse a single line from the data pointer
-  Status ParseLine(const char* data, const char* data_end, bool is_final,
+  template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+  Status ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+                   const char* data, const char* data_end, bool is_final,
                    const char** out_data);
 
-  ParseOptions options_;
+  MemoryPool* pool_;
+  const ParseOptions options_;
   // The number of rows parsed from the block
   int32_t num_rows_;
   // The number of columns (can be -1 at start)
@@ -115,22 +122,22 @@ class ARROW_EXPORT BlockParser {
   int32_t max_num_rows_;
 
   // Linear scratchpad for parsed values
-  // XXX should we ensure it's padded with 8 or 16 excess zero bytes? it could help
-  // with null parsing...
-  // TODO should be able to presize scratch space
-  std::vector<uint8_t> parsed_;
-#ifdef CSV_PARSER_USE_BITFIELD
   struct ValueDesc {
     uint32_t offset : 31;
     bool quoted : 1;
   };
-  std::vector<ValueDesc> values_;
-#else
-  // Value offsets inside the scratchpad
-  std::vector<uint32_t> offsets_;
-  // Whether each value was quoted or not
-  std::vector<bool> quoted_;
-#endif
+
+  // XXX should we ensure the parsed buffer is padded with 8 or 16 excess zero bytes?
+  // It may help with null parsing...
+  std::vector<std::shared_ptr<Buffer>> values_buffers_;
+  std::shared_ptr<Buffer> parsed_buffer_;
+  const uint8_t* parsed_;
+  int32_t values_size_;
+  int32_t parsed_size_;
+
+  class ResizableValuesWriter;
+  class PresizedValuesWriter;
+  class PresizedParsedWriter;
 };
 
 }  // namespace csv
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index 730abe4..6fbbd42 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -138,7 +138,7 @@ class BaseTableReader : public csv::TableReader {
       return Status::Invalid("header_rows == 0 needs explicit column names");
     }
 
-    BlockParser parser(parse_options_, num_cols_, parse_options_.header_rows);
+    BlockParser parser(pool_, parse_options_, num_cols_, parse_options_.header_rows);
 
     uint32_t parsed_size = 0;
     RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_),
@@ -251,7 +251,8 @@ class SerialTableReader : public BaseTableReader {
     RETURN_NOT_OK(ProcessHeader());
 
     static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
-    auto parser = std::make_shared<BlockParser>(parse_options_, num_cols_, max_num_rows);
+    auto parser =
+        std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
     while (!eof_) {
       // Consume current block
       uint32_t parsed_size = 0;
@@ -338,8 +339,8 @@ class ThreadedTableReader : public BaseTableReader {
 
         // "mutable" allows to modify captured by-copy chunk_buffer
         task_group_->Append([=]() mutable -> Status {
-          auto parser =
-              std::make_shared<BlockParser>(parse_options_, num_cols_, max_num_rows);
+          auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_,
+                                                      max_num_rows);
           uint32_t parsed_size = 0;
           RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data),
                                       chunk_size, &parsed_size));
@@ -374,7 +375,7 @@ class ThreadedTableReader : public BaseTableReader {
         builder->SetTaskGroup(task_group_);
       }
       auto parser =
-          std::make_shared<BlockParser>(parse_options_, num_cols_, max_num_rows);
+          std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
       uint32_t parsed_size = 0;
       RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_),
                                        static_cast<uint32_t>(cur_size_), &parsed_size));