You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2018/11/05 16:24:40 UTC
[arrow] branch master updated: ARROW-3666: [C++] Improve C++ parser
performance
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 67a436a ARROW-3666: [C++] Improve C++ parser performance
67a436a is described below
commit 67a436aed11cc0b08da4a4f310dacc380e29de0d
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Nov 5 17:24:22 2018 +0100
ARROW-3666: [C++] Improve C++ parser performance
Make CSV parsing around 30% faster.
Benchmark of reading a CSV file with integer columns. Before:
* multi-threaded: 1.2 GB/s
* single-threaded: 190 MB/s
After:
* multi-threaded: 1.6 GB/s
* single-threaded: 210 MB/s
(on a 8-core 16-thread AMD Ryzen CPU)
Author: Antoine Pitrou <an...@python.org>
Closes #2886 from pitrou/ARROW-3666-faster-csv-parser and squashes the following commits:
1c60c54b <Antoine Pitrou> Address review comments
91216a35 <Antoine Pitrou> ARROW-3666: Improve C++ parser performance
---
cpp/src/arrow/csv/parser.cc | 403 +++++++++++++++++++++++++++++++++-----------
cpp/src/arrow/csv/parser.h | 79 +++++----
cpp/src/arrow/csv/reader.cc | 11 +-
3 files changed, 357 insertions(+), 136 deletions(-)
diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc
index 240b1f8..ea602df 100644
--- a/cpp/src/arrow/csv/parser.cc
+++ b/cpp/src/arrow/csv/parser.cc
@@ -17,9 +17,12 @@
#include "arrow/csv/parser.h"
+#include <algorithm>
#include <cstdio>
#include <sstream>
+#include <utility>
+#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
@@ -38,109 +41,228 @@ static Status MismatchingColumns(int32_t expected, int32_t actual) {
return ParseError(s);
}
-BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
- : options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
+template <bool Quoting, bool Escaping>
+class SpecializedOptions {
+ public:
+ static constexpr bool quoting = Quoting;
+ static constexpr bool escaping = Escaping;
+};
+
+// A helper class allocating the buffer for parsed values and writing into it
+// without any further resizes, except at the end.
+class BlockParser::PresizedParsedWriter {
+ public:
+ PresizedParsedWriter(MemoryPool* pool, uint32_t size)
+ : parsed_size_(0), parsed_capacity_(size) {
+ ARROW_CHECK_OK(AllocateResizableBuffer(pool, parsed_capacity_, &parsed_buffer_));
+ parsed_ = parsed_buffer_->mutable_data();
+ }
+
+ void Finish(std::shared_ptr<Buffer>* out_parsed) {
+ ARROW_CHECK_OK(parsed_buffer_->Resize(parsed_size_));
+ *out_parsed = parsed_buffer_;
+ }
+
+ void BeginLine() { saved_parsed_size_ = parsed_size_; }
+
+ void PushFieldChar(char c) {
+ DCHECK_LT(parsed_size_, parsed_capacity_);
+ parsed_[parsed_size_++] = static_cast<uint8_t>(c);
+ }
+
+ // Rollback the state that was saved in BeginLine()
+ void RollbackLine() { parsed_size_ = saved_parsed_size_; }
+
+ int64_t size() { return parsed_size_; }
+
+ std::shared_ptr<ResizableBuffer> parsed_buffer_;
+ uint8_t* parsed_;
+ int64_t parsed_size_;
+ int64_t parsed_capacity_;
+ // Checkpointing, for when an incomplete line is encountered at end of block
+ int64_t saved_parsed_size_;
+};
+
+// A helper class handling a growable buffer for values offsets. This class is
+// used when the number of columns is not yet known and we therefore cannot
+// efficiently presize the target area for a given number of rows.
+class BlockParser::ResizableValuesWriter {
+ public:
+ explicit ResizableValuesWriter(MemoryPool* pool)
+ : values_size_(0), values_capacity_(256) {
+ ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
+ &values_buffer_));
+ values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
+ }
+
+ template <typename ParsedWriter>
+ void Start(ParsedWriter& parsed_writer) {
+ PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
+ }
+
+ void Finish(std::shared_ptr<Buffer>* out_values) {
+ ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
+ *out_values = values_buffer_;
+ }
+
+ void BeginLine() { saved_values_size_ = values_size_; }
+
+ void StartField(bool quoted) { quoted_ = quoted; }
+
+ template <typename ParsedWriter>
+ void FinishField(ParsedWriter* parsed_writer) {
+ PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
+ }
+
+ // Rollback the state that was saved in BeginLine()
+ void RollbackLine() { values_size_ = saved_values_size_; }
+
+ protected:
+ void PushValue(ValueDesc v) {
+ if (ARROW_PREDICT_FALSE(values_size_ == values_capacity_)) {
+ values_capacity_ = values_capacity_ * 2;
+ ARROW_CHECK_OK(values_buffer_->Resize(values_capacity_ * sizeof(*values_)));
+ values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
+ }
+ values_[values_size_++] = v;
+ }
+
+ std::shared_ptr<ResizableBuffer> values_buffer_;
+ ValueDesc* values_;
+ int64_t values_size_;
+ int64_t values_capacity_;
+ bool quoted_;
+ // Checkpointing, for when an incomplete line is encountered at end of block
+ int64_t saved_values_size_;
+};
-Status BlockParser::ParseLine(const char* data, const char* data_end, bool is_final,
+// A helper class allocating the buffer for values offsets and writing into it
+// without any further resizes, except at the end. This class is used once the
+// number of columns is known, as it eliminates resizes and generates simpler,
+// faster CSV parsing code.
+class BlockParser::PresizedValuesWriter {
+ public:
+ PresizedValuesWriter(MemoryPool* pool, int32_t num_rows, int32_t num_cols)
+ : values_size_(0), values_capacity_(1 + num_rows * num_cols) {
+ ARROW_CHECK_OK(AllocateResizableBuffer(pool, values_capacity_ * sizeof(*values_),
+ &values_buffer_));
+ values_ = reinterpret_cast<ValueDesc*>(values_buffer_->mutable_data());
+ }
+
+ template <typename ParsedWriter>
+ void Start(ParsedWriter& parsed_writer) {
+ PushValue({static_cast<uint32_t>(parsed_writer.size()) & 0x7fffffffU, false});
+ }
+
+ void Finish(std::shared_ptr<Buffer>* out_values) {
+ ARROW_CHECK_OK(values_buffer_->Resize(values_size_ * sizeof(*values_)));
+ *out_values = values_buffer_;
+ }
+
+ void BeginLine() { saved_values_size_ = values_size_; }
+
+ void StartField(bool quoted) { quoted_ = quoted; }
+
+ template <typename ParsedWriter>
+ void FinishField(ParsedWriter* parsed_writer) {
+ PushValue({static_cast<uint32_t>(parsed_writer->size()) & 0x7fffffffU, quoted_});
+ }
+
+ // Rollback the state that was saved in BeginLine()
+ void RollbackLine() { values_size_ = saved_values_size_; }
+
+ protected:
+ void PushValue(ValueDesc v) {
+ DCHECK_LT(values_size_, values_capacity_);
+ values_[values_size_++] = v;
+ }
+
+ std::shared_ptr<ResizableBuffer> values_buffer_;
+ ValueDesc* values_;
+ int64_t values_size_;
+ const int64_t values_capacity_;
+ bool quoted_;
+ // Checkpointing, for when an incomplete line is encountered at end of block
+ int64_t saved_values_size_;
+};
+
+template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+Status BlockParser::ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+ const char* data, const char* data_end, bool is_final,
const char** out_data) {
int32_t num_cols = 0;
char c;
-#ifdef CSV_PARSER_USE_BITFIELD
- bool quoted;
-#endif
- auto saved_parsed_size = parsed_.size();
-#ifdef CSV_PARSER_USE_BITFIELD
- auto saved_values_size = values_.size();
-#else
- auto saved_offsets_size = offsets_.size();
- auto saved_quoted_size = quoted_.size();
-#endif
+ values_writer->BeginLine();
+ parsed_writer->BeginLine();
- // Subroutines to manage parser state
- auto InitField = [&]() {};
- auto PushFieldChar = [&](char c) { parsed_.push_back(static_cast<uint8_t>(c)); };
- auto FinishField = [&]() {
-#ifdef CSV_PARSER_USE_BITFIELD
- ValueDesc v = {static_cast<uint32_t>(parsed_.size()) & 0x7fffffffU, quoted};
- values_.push_back(v);
-#else
- offsets_.push_back(parsed_.size());
-#endif
- ++num_cols;
- };
- auto RewindState = [&]() {
- parsed_.resize(saved_parsed_size);
-#ifdef CSV_PARSER_USE_BITFIELD
- values_.resize(saved_values_size);
-#else
- offsets_.resize(saved_offsets_size);
- quoted_.resize(saved_quoted_size);
-#endif
- };
+ auto FinishField = [&]() { values_writer->FinishField(parsed_writer); };
+
+ DCHECK_GT(data_end, data);
// The parsing state machine
FieldStart:
// At the start of a field
- InitField();
// Quoting is only recognized at start of field
- if (options_.quoting && data != data_end && *data == options_.quote_char) {
+ if (SpecializedOptions::quoting && ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
++data;
-#ifdef CSV_PARSER_USE_BITFIELD
- quoted = true;
-#else
- quoted_.push_back(true);
-#endif
+ values_writer->StartField(true /* quoted */);
goto InQuotedField;
} else {
-#ifdef CSV_PARSER_USE_BITFIELD
- quoted = false;
-#else
- quoted_.push_back(false);
-#endif
+ values_writer->StartField(false /* quoted */);
goto InField;
}
InField:
// Inside a non-quoted part of a field
- if (data == data_end) {
+ if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
- if (options_.escaping && c == options_.escape_char) {
- if (data == data_end) {
+ if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
+ if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
- PushFieldChar(c);
+ parsed_writer->PushFieldChar(c);
goto InField;
}
- if (c == '\n' || c == '\r') {
- goto LineEnd;
- }
- if (c == options_.delimiter) {
+ if (ARROW_PREDICT_FALSE(c == options_.delimiter)) {
goto FieldEnd;
}
- PushFieldChar(c);
+ if (ARROW_PREDICT_FALSE(c < ' ')) {
+ if (c == '\r') {
+ // In the middle of a newline separator?
+ if (ARROW_PREDICT_TRUE(data < data_end) && *data == '\n') {
+ data++;
+ }
+ goto LineEnd;
+ }
+ if (c == '\n') {
+ goto LineEnd;
+ }
+ }
+ parsed_writer->PushFieldChar(c);
goto InField;
InQuotedField:
// Inside a quoted part of a field
- if (data == data_end) {
+ if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
- if (options_.escaping && c == options_.escape_char) {
- if (data == data_end) {
+ if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
+ if (ARROW_PREDICT_FALSE(data == data_end)) {
goto AbortLine;
}
c = *data++;
- PushFieldChar(c);
+ parsed_writer->PushFieldChar(c);
goto InQuotedField;
}
- if (c == options_.quote_char) {
- if (options_.double_quote && data < data_end && *data == options_.quote_char) {
+ if (ARROW_PREDICT_FALSE(c == options_.quote_char)) {
+ if (options_.double_quote && ARROW_PREDICT_TRUE(data < data_end) &&
+ ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
// Double-quoting
++data;
} else {
@@ -148,23 +270,25 @@ InQuotedField:
goto InField;
}
}
- PushFieldChar(c);
+ parsed_writer->PushFieldChar(c);
goto InQuotedField;
FieldEnd:
// At the end of a field
FinishField();
+ ++num_cols;
+ if (ARROW_PREDICT_FALSE(data == data_end)) {
+ goto AbortLine;
+ }
goto FieldStart;
LineEnd:
- // At the end of line, possibly in the middle of the newline separator
+ // At the end of line
FinishField();
- if (data < data_end && data[-1] == '\r' && *data == '\n') {
- data++;
- }
- if (num_cols_ == -1) {
+ ++num_cols;
+ if (ARROW_PREDICT_FALSE(num_cols_ == -1)) {
num_cols_ = num_cols;
- } else if (num_cols != num_cols_) {
+ } else if (ARROW_PREDICT_FALSE(num_cols != num_cols_)) {
return MismatchingColumns(num_cols_, num_cols);
}
*out_data = data;
@@ -174,6 +298,7 @@ AbortLine:
// Not a full line except perhaps if in final block
if (is_final) {
FinishField();
+ ++num_cols;
if (num_cols_ == -1) {
num_cols_ = num_cols;
} else if (num_cols != num_cols_) {
@@ -183,52 +308,133 @@ AbortLine:
return Status::OK();
}
// Truncated line at end of block, rewind parsed state
- RewindState();
+ values_writer->RollbackLine();
+ parsed_writer->RollbackLine();
return Status::OK();
}
-Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final,
- uint32_t* out_size) {
- num_rows_ = 0;
- // These don't shrink the allocated capacity, so reuses of BlockParser
- // avoid most allocations when appending data
- parsed_.clear();
-#ifdef CSV_PARSER_USE_BITFIELD
- values_.clear();
- values_.push_back({0, false});
-#else
- offsets_.clear();
- offsets_.push_back(0);
- quoted_.clear();
-#endif
-
- const char* data = start;
- const char* data_end = start + size;
-
- while (data < data_end && num_rows_ < max_num_rows_) {
+template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+Status BlockParser::ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+ const char* data, const char* data_end, bool is_final,
+ int32_t rows_in_chunk, const char** out_data,
+ bool* finished_parsing) {
+ while (data < data_end && rows_in_chunk > 0) {
const char* line_end = data;
- RETURN_NOT_OK(ParseLine(data, data_end, is_final, &line_end));
+ RETURN_NOT_OK(ParseLine<SpecializedOptions>(values_writer, parsed_writer, data,
+ data_end, is_final, &line_end));
if (line_end == data) {
// Cannot parse any further
+ *finished_parsing = true;
break;
}
data = line_end;
++num_rows_;
+ --rows_in_chunk;
+ }
+ // Append new buffers and update size
+ std::shared_ptr<Buffer> values_buffer;
+ values_writer->Finish(&values_buffer);
+ if (values_buffer->size() > 0) {
+ values_size_ += static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc) - 1);
+ values_buffers_.push_back(std::move(values_buffer));
}
+ *out_data = data;
+ return Status::OK();
+}
+
+template <typename SpecializedOptions>
+Status BlockParser::DoParseSpecialized(const char* start, uint32_t size, bool is_final,
+ uint32_t* out_size) {
+ num_rows_ = 0;
+ values_size_ = 0;
+ parsed_size_ = 0;
+ values_buffers_.clear();
+ parsed_buffer_.reset();
+ parsed_ = nullptr;
+
+ const char* data = start;
+ const char* data_end = start + size;
+ bool finished_parsing = false;
+
+ PresizedParsedWriter parsed_writer(pool_, size);
+
+ if (num_cols_ == -1) {
+ // Can't presize values when the number of columns is not known, first parse
+ // a single line
+ const int32_t rows_in_chunk = 1;
+ ResizableValuesWriter values_writer(pool_);
+ values_writer.Start(parsed_writer);
+
+ RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
+ data_end, is_final, rows_in_chunk, &data,
+ &finished_parsing));
+ }
+ while (!finished_parsing && data < data_end && num_rows_ < max_num_rows_) {
+ // We know the number of columns, so can presize a values array for
+ // a given number of rows
+ DCHECK_GE(num_cols_, 0);
+
+ int32_t rows_in_chunk;
+ if (num_cols_ > 0) {
+ rows_in_chunk = std::min(32768 / num_cols_, max_num_rows_ - num_rows_);
+ } else {
+ rows_in_chunk = std::min(32768, max_num_rows_ - num_rows_);
+ }
+
+ PresizedValuesWriter values_writer(pool_, rows_in_chunk, num_cols_);
+ values_writer.Start(parsed_writer);
+
+ RETURN_NOT_OK(ParseChunk<SpecializedOptions>(&values_writer, &parsed_writer, data,
+ data_end, is_final, rows_in_chunk, &data,
+ &finished_parsing));
+ }
+
+ parsed_writer.Finish(&parsed_buffer_);
+ parsed_size_ = static_cast<int32_t>(parsed_buffer_->size());
+ parsed_ = parsed_buffer_->data();
+
+ DCHECK_EQ(values_size_, num_rows_ * num_cols_);
if (num_cols_ == -1) {
DCHECK_EQ(num_rows_, 0);
}
-#ifdef CSV_PARSER_USE_BITFIELD
- DCHECK_EQ(values_.size(), 1 + static_cast<size_t>(num_rows_ * num_cols_));
- DCHECK_EQ(parsed_.size(), values_[values_.size() - 1].offset);
-#else
- DCHECK_EQ(offsets_.size(), 1 + static_cast<size_t>(num_rows_ * num_cols_));
- DCHECK_EQ(parsed_.size(), offsets_[offsets_.size() - 1]);
+#ifndef NDEBUG
+ if (num_rows_ > 0) {
+ DCHECK_GT(values_buffers_.size(), 0);
+ auto& last_values_buffer = values_buffers_.back();
+ auto last_values = reinterpret_cast<const ValueDesc*>(last_values_buffer->data());
+ auto last_values_size = last_values_buffer->size() / sizeof(ValueDesc);
+ auto check_parsed_size =
+ static_cast<int32_t>(last_values[last_values_size - 1].offset);
+ DCHECK_EQ(parsed_size_, check_parsed_size);
+ } else {
+ DCHECK_EQ(parsed_size_, 0);
+ }
#endif
*out_size = static_cast<uint32_t>(data - start);
return Status::OK();
}
+Status BlockParser::DoParse(const char* start, uint32_t size, bool is_final,
+ uint32_t* out_size) {
+ if (options_.quoting) {
+ if (options_.escaping) {
+ return DoParseSpecialized<SpecializedOptions<true, true>>(start, size, is_final,
+ out_size);
+ } else {
+ return DoParseSpecialized<SpecializedOptions<true, false>>(start, size, is_final,
+ out_size);
+ }
+ } else {
+ if (options_.escaping) {
+ return DoParseSpecialized<SpecializedOptions<false, true>>(start, size, is_final,
+ out_size);
+ } else {
+ return DoParseSpecialized<SpecializedOptions<false, false>>(start, size, is_final,
+ out_size);
+ }
+ }
+}
+
Status BlockParser::Parse(const char* data, uint32_t size, uint32_t* out_size) {
return DoParse(data, size, false /* is_final */, out_size);
}
@@ -237,5 +443,12 @@ Status BlockParser::ParseFinal(const char* data, uint32_t size, uint32_t* out_si
return DoParse(data, size, true /* is_final */, out_size);
}
+BlockParser::BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols,
+ int32_t max_num_rows)
+ : pool_(pool), options_(options), num_cols_(num_cols), max_num_rows_(max_num_rows) {}
+
+BlockParser::BlockParser(ParseOptions options, int32_t num_cols, int32_t max_num_rows)
+ : BlockParser(default_memory_pool(), options, num_cols, max_num_rows) {}
+
} // namespace csv
} // namespace arrow
diff --git a/cpp/src/arrow/csv/parser.h b/cpp/src/arrow/csv/parser.h
index 4529fce..0e1a415 100644
--- a/cpp/src/arrow/csv/parser.h
+++ b/cpp/src/arrow/csv/parser.h
@@ -22,21 +22,20 @@
#include <memory>
#include <vector>
+#include "arrow/buffer.h"
#include "arrow/csv/options.h"
#include "arrow/status.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
namespace arrow {
+
+class MemoryPool;
+
namespace csv {
constexpr int32_t kMaxParserNumRows = 100000;
-// Whether BlockParser will use bitfields for better memory consumption
-// and cache locality.
-#undef CSV_PARSER_USE_BITFIELD
-#define CSV_PARSER_USE_BITFIELD
-
/// \class BlockParser
/// \brief A reusable block-based parser for CSV data
///
@@ -53,6 +52,8 @@ class ARROW_EXPORT BlockParser {
public:
explicit BlockParser(ParseOptions options, int32_t num_cols = -1,
int32_t max_num_rows = kMaxParserNumRows);
+ explicit BlockParser(MemoryPool* pool, ParseOptions options, int32_t num_cols = -1,
+ int32_t max_num_rows = kMaxParserNumRows);
/// \brief Parse a block of data
///
@@ -69,7 +70,7 @@ class ARROW_EXPORT BlockParser {
int32_t num_rows() const { return num_rows_; }
int32_t num_cols() const { return num_cols_; }
/// \brief Return the total size in bytes of parsed data
- int32_t num_bytes() const { return static_cast<int32_t>(parsed_.size()); }
+ uint32_t num_bytes() const { return parsed_size_; }
/// \brief Visit parsed values in a column
///
@@ -77,36 +78,42 @@ class ARROW_EXPORT BlockParser {
/// Status(const uint8_t* data, uint32_t size, bool quoted)
template <typename Visitor>
Status VisitColumn(int32_t col_index, Visitor&& visit) const {
-#ifdef CSV_PARSER_USE_BITFIELD
- for (int32_t pos = col_index; pos < num_cols_ * num_rows_; pos += num_cols_) {
- auto start = values_[pos].offset;
- auto stop = values_[pos + 1].offset;
- auto quoted = values_[pos + 1].quoted;
- ARROW_RETURN_NOT_OK(visit(parsed_.data() + start, stop - start, quoted));
+ for (size_t buf_index = 0; buf_index < values_buffers_.size(); ++buf_index) {
+ const auto& values_buffer = values_buffers_[buf_index];
+ const auto values = reinterpret_cast<const ValueDesc*>(values_buffer->data());
+ const auto max_pos =
+ static_cast<int32_t>(values_buffer->size() / sizeof(ValueDesc)) - 1;
+ for (int32_t pos = col_index; pos < max_pos; pos += num_cols_) {
+ auto start = values[pos].offset;
+ auto stop = values[pos + 1].offset;
+ auto quoted = values[pos + 1].quoted;
+ ARROW_RETURN_NOT_OK(visit(parsed_ + start, stop - start, quoted));
+ }
}
-#else
- for (int32_t pos = col_index; pos < num_cols_ * num_rows_; pos += num_cols_) {
- auto start = offsets_[pos];
- auto stop = offsets_[pos + 1];
- auto quoted = quoted_[pos];
- ARROW_RETURN_NOT_OK(visit(parsed_.data() + start, stop - start, quoted));
- }
-#endif
return Status::OK();
}
- // XXX add a ClearColumn method to signal that a column won't be visited anymore?
-
protected:
ARROW_DISALLOW_COPY_AND_ASSIGN(BlockParser);
Status DoParse(const char* data, uint32_t size, bool is_final, uint32_t* out_size);
+ template <typename SpecializedOptions>
+ Status DoParseSpecialized(const char* data, uint32_t size, bool is_final,
+ uint32_t* out_size);
+
+ template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+ Status ParseChunk(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+ const char* data, const char* data_end, bool is_final,
+ int32_t rows_in_chunk, const char** out_data, bool* finished_parsing);
// Parse a single line from the data pointer
- Status ParseLine(const char* data, const char* data_end, bool is_final,
+ template <typename SpecializedOptions, typename ValuesWriter, typename ParsedWriter>
+ Status ParseLine(ValuesWriter* values_writer, ParsedWriter* parsed_writer,
+ const char* data, const char* data_end, bool is_final,
const char** out_data);
- ParseOptions options_;
+ MemoryPool* pool_;
+ const ParseOptions options_;
// The number of rows parsed from the block
int32_t num_rows_;
// The number of columns (can be -1 at start)
@@ -115,22 +122,22 @@ class ARROW_EXPORT BlockParser {
int32_t max_num_rows_;
// Linear scratchpad for parsed values
- // XXX should we ensure it's padded with 8 or 16 excess zero bytes? it could help
- // with null parsing...
- // TODO should be able to presize scratch space
- std::vector<uint8_t> parsed_;
-#ifdef CSV_PARSER_USE_BITFIELD
struct ValueDesc {
uint32_t offset : 31;
bool quoted : 1;
};
- std::vector<ValueDesc> values_;
-#else
- // Value offsets inside the scratchpad
- std::vector<uint32_t> offsets_;
- // Whether each value was quoted or not
- std::vector<bool> quoted_;
-#endif
+
+ // XXX should we ensure the parsed buffer is padded with 8 or 16 excess zero bytes?
+ // It may help with null parsing...
+ std::vector<std::shared_ptr<Buffer>> values_buffers_;
+ std::shared_ptr<Buffer> parsed_buffer_;
+ const uint8_t* parsed_;
+ int32_t values_size_;
+ int32_t parsed_size_;
+
+ class ResizableValuesWriter;
+ class PresizedValuesWriter;
+ class PresizedParsedWriter;
};
} // namespace csv
diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc
index 730abe4..6fbbd42 100644
--- a/cpp/src/arrow/csv/reader.cc
+++ b/cpp/src/arrow/csv/reader.cc
@@ -138,7 +138,7 @@ class BaseTableReader : public csv::TableReader {
return Status::Invalid("header_rows == 0 needs explicit column names");
}
- BlockParser parser(parse_options_, num_cols_, parse_options_.header_rows);
+ BlockParser parser(pool_, parse_options_, num_cols_, parse_options_.header_rows);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser.Parse(reinterpret_cast<const char*>(cur_data_),
@@ -251,7 +251,8 @@ class SerialTableReader : public BaseTableReader {
RETURN_NOT_OK(ProcessHeader());
static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
- auto parser = std::make_shared<BlockParser>(parse_options_, num_cols_, max_num_rows);
+ auto parser =
+ std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
while (!eof_) {
// Consume current block
uint32_t parsed_size = 0;
@@ -338,8 +339,8 @@ class ThreadedTableReader : public BaseTableReader {
// "mutable" allows to modify captured by-copy chunk_buffer
task_group_->Append([=]() mutable -> Status {
- auto parser =
- std::make_shared<BlockParser>(parse_options_, num_cols_, max_num_rows);
+ auto parser = std::make_shared<BlockParser>(pool_, parse_options_, num_cols_,
+ max_num_rows);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser->Parse(reinterpret_cast<const char*>(chunk_data),
chunk_size, &parsed_size));
@@ -374,7 +375,7 @@ class ThreadedTableReader : public BaseTableReader {
builder->SetTaskGroup(task_group_);
}
auto parser =
- std::make_shared<BlockParser>(parse_options_, num_cols_, max_num_rows);
+ std::make_shared<BlockParser>(pool_, parse_options_, num_cols_, max_num_rows);
uint32_t parsed_size = 0;
RETURN_NOT_OK(parser->ParseFinal(reinterpret_cast<const char*>(cur_data_),
static_cast<uint32_t>(cur_size_), &parsed_size));