You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/12/02 09:51:50 UTC

[arrow] branch master updated: ARROW-14940: [C++] Speed up CSV parser with long CSV cells

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cdb80c  ARROW-14940: [C++] Speed up CSV parser with long CSV cells
6cdb80c is described below

commit 6cdb80c6cfd564163e3358dbf30ea29e7db69d26
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Thu Dec 2 10:49:58 2021 +0100

    ARROW-14940: [C++] Speed up CSV parser with long CSV cells
    
    Some CSV files may have long cells (values), for example if containing arbitrary texts or even things like timestamps.
    We can speed up parsing such CSV files by filtering multiple bytes at once for state-changing characters such as delimiters.
    
    This PR adds two kinds of bulk filters:
    - a very simple heuristic Bloom filter
    - a precise filter using SSE4.2 packed compare
    
    Given that negative filter matches have a non-trivial cost, the bulk filters are enabled only if the average cell length exceeds a given threshold.
    
    Closes #11828 from pitrou/ARROW-14940-csv-bulk-filter
    
    Authored-by: Antoine Pitrou <an...@python.org>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/csv/parser.cc | 252 +++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 235 insertions(+), 17 deletions(-)

diff --git a/cpp/src/arrow/csv/parser.cc b/cpp/src/arrow/csv/parser.cc
index 6400c94..afd5a6f 100644
--- a/cpp/src/arrow/csv/parser.cc
+++ b/cpp/src/arrow/csv/parser.cc
@@ -26,6 +26,7 @@
 #include "arrow/result.h"
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/simd.h"
 
 namespace arrow {
 namespace csv {
@@ -64,6 +65,127 @@ class SpecializedOptions {
   static constexpr bool escaping = Escaping;
 };
 
+//
+// Bulk filters for packed character matching.
+// These filters allow checking multiple CSV bytes at once for specific
+// characters (cell delimiter, line delimiter, escape char, etc.).
+//
+
+// Heuristic Bloom filters: 1, 2 or 4 bytes at a time.
+
+class BaseBloomFilter {
+ public:
+  explicit BaseBloomFilter(const ParseOptions& options) : filter_(MakeFilter(options)) {}
+
+ protected:
+  using FilterType = uint64_t;
+  // 63 for uint64_t
+  static constexpr uint8_t kCharMask = static_cast<uint8_t>((8 * sizeof(FilterType)) - 1);
+
+  FilterType MakeFilter(const ParseOptions& options) {
+    FilterType filter = 0;
+    auto add_char = [&](char c) { filter |= CharFilter(c); };
+    add_char('\n');
+    add_char('\r');
+    add_char(options.delimiter);
+    if (options.escaping) {
+      add_char(options.escape_char);
+    }
+    if (options.quoting) {
+      add_char(options.quote_char);
+    }
+    return filter;
+  }
+
+  // A given character value will set/test one bit in the 64-bit filter,
+  // whose bit number is taken from low bits of the character value.
+  //
+  // For example 'b' (ASCII value 98) will set/test bit #34 in the filter.
+  // If the bit is set in the filter, the given character *may* be part
+  // of the matched characters.  If the bit is unset in the filter,
+  // the the given character *cannot* be part of the matched characters.
+  FilterType CharFilter(uint8_t c) {
+    return static_cast<FilterType>(1) << (c & kCharMask);
+  }
+
+  FilterType MatchChar(uint8_t c) { return CharFilter(c) & filter_; }
+
+  const FilterType filter_;
+};
+
+class BloomFilter1B : public BaseBloomFilter {
+ public:
+  using WordType = uint8_t;
+
+  using BaseBloomFilter::BaseBloomFilter;
+
+  bool Matches(uint8_t c) { return (CharFilter(c) & filter_) != 0; }
+};
+
+class BloomFilter2B : public BaseBloomFilter {
+ public:
+  using WordType = uint16_t;
+
+  using BaseBloomFilter::BaseBloomFilter;
+
+  bool Matches(uint16_t w) {
+    return (MatchChar(static_cast<uint8_t>(w >> 8)) |
+            MatchChar(static_cast<uint8_t>(w))) != 0;
+  }
+};
+
+class BloomFilter4B : public BaseBloomFilter {
+ public:
+  using WordType = uint32_t;
+
+  using BaseBloomFilter::BaseBloomFilter;
+
+  bool Matches(uint32_t w) {
+    return (MatchChar(static_cast<uint8_t>(w >> 24)) |
+            MatchChar(static_cast<uint8_t>(w >> 16)) |
+            MatchChar(static_cast<uint8_t>(w >> 8)) |
+            MatchChar(static_cast<uint8_t>(w))) != 0;
+  }
+};
+
+#ifdef ARROW_HAVE_SSE4_2
+
+// SSE4.2 filter: 8 bytes at a time, using packed compare instruction
+
+// NOTE: on SVE, could use svmatch[_u8] for similar functionality.
+
+class SSE42Filter {
+ public:
+  using WordType = uint64_t;
+
+  explicit SSE42Filter(const ParseOptions& options) : filter_(MakeFilter(options)) {}
+
+  bool Matches(WordType w) {
+    // Look up every byte in `w` in the SIMD filter.
+    return _mm_cmpistrc(_mm_set1_epi64x(w), filter_,
+                        _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_ANY);
+  }
+
+ protected:
+  using BulkFilterType = __m128i;
+
+  BulkFilterType MakeFilter(const ParseOptions& options) {
+    // Make a SIMD word of the characters we want to match
+    const char cr = '\r';
+    const char lf = '\n';
+    const char delim = options.delimiter;
+    const char quote = options.quoting ? options.quote_char : cr;
+    const char escape = options.escaping ? options.escape_char : cr;
+
+    return _mm_set_epi8(delim, quote, escape, lf, cr, cr, cr, cr, cr, cr, cr, cr, cr, cr,
+                        cr, cr);
+  }
+
+  const BulkFilterType filter_;
+};
+
+#endif
+
 // A helper class allocating the buffer for parsed values and writing into it
 // without any further resizes, except at the end.
 class PresizedDataWriter {
@@ -86,6 +208,13 @@ class PresizedDataWriter {
     parsed_[parsed_size_++] = static_cast<uint8_t>(c);
   }
 
+  template <typename Word>
+  void PushFieldWord(Word w) {
+    DCHECK_GE(parsed_capacity_ - parsed_size_, static_cast<int64_t>(sizeof(w)));
+    memcpy(parsed_ + parsed_size_, &w, sizeof(w));
+    parsed_size_ += sizeof(w);
+  }
+
   // Rollback the state that was saved in BeginLine()
   void RollbackLine() { parsed_size_ = saved_parsed_size_; }
 
@@ -181,13 +310,21 @@ class PresizedValueDescWriter : public ValueDescWriter<PresizedValueDescWriter>
 }  // namespace
 
 class BlockParserImpl {
+#if defined(ARROW_HAVE_SSE4_2) && (defined(__x86_64__) || defined(_M_X64))
+  // (the SSE4.2 filter seems to crash on RTools with 32-bit MinGW)
+  using BulkFilterType = SSE42Filter;
+#else
+  using BulkFilterType = BloomFilter4B;
+#endif
+
  public:
   BlockParserImpl(MemoryPool* pool, ParseOptions options, int32_t num_cols,
                   int64_t first_row, int32_t max_num_rows)
       : pool_(pool),
-        options_(options),
+        options_(std::move(options)),
         first_row_(first_row),
         max_num_rows_(max_num_rows),
+        bulk_filter_(options_),
         batch_(num_cols) {}
 
   const DataBatch& parsed_batch() const { return batch_; }
@@ -230,7 +367,8 @@ class BlockParserImpl {
     return MismatchingColumns(row);
   }
 
-  template <typename SpecializedOptions, typename ValueDescWriter, typename DataWriter>
+  template <typename SpecializedOptions, bool UseBulkFilter, typename ValueDescWriter,
+            typename DataWriter>
   Status ParseLine(ValueDescWriter* values_writer, DataWriter* parsed_writer,
                    const char* data, const char* data_end, bool is_final,
                    const char** out_data) {
@@ -265,6 +403,18 @@ class BlockParserImpl {
 
   FieldStart:
     // At the start of a field
+    if (*data == options_.delimiter) {
+      // Empty cells are very common in some files, shortcut them
+      values_writer->StartField(false /* quoted */);
+      FinishField();
+      ++data;
+      ++num_cols;
+      if (ARROW_PREDICT_FALSE(data == data_end)) {
+        goto AbortLine;
+      }
+      goto FieldStart;
+    }
+
     // Quoting is only recognized at start of field
     if (SpecializedOptions::quoting &&
         ARROW_PREDICT_FALSE(*data == options_.quote_char)) {
@@ -278,9 +428,18 @@ class BlockParserImpl {
 
   InField:
     // Inside a non-quoted part of a field
-    if (ARROW_PREDICT_FALSE(data == data_end)) {
-      goto AbortLine;
+    if (UseBulkFilter) {
+      const char* bulk_end = BulkFilter(parsed_writer, data, data_end);
+      if (ARROW_PREDICT_FALSE(bulk_end == nullptr)) {
+        goto AbortLine;
+      }
+      data = bulk_end;
+    } else {
+      if (ARROW_PREDICT_FALSE(data == data_end)) {
+        goto AbortLine;
+      }
     }
+
     c = *data++;
     if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
       if (ARROW_PREDICT_FALSE(data == data_end)) {
@@ -310,8 +469,16 @@ class BlockParserImpl {
 
   InQuotedField:
     // Inside a quoted part of a field
-    if (ARROW_PREDICT_FALSE(data == data_end)) {
-      goto AbortLine;
+    if (UseBulkFilter) {
+      const char* bulk_end = BulkFilter(parsed_writer, data, data_end);
+      if (ARROW_PREDICT_FALSE(bulk_end == nullptr)) {
+        goto AbortLine;
+      }
+      data = bulk_end;
+    } else {
+      if (ARROW_PREDICT_FALSE(data == data_end)) {
+        goto AbortLine;
+      }
     }
     c = *data++;
     if (SpecializedOptions::escaping && ARROW_PREDICT_FALSE(c == options_.escape_char)) {
@@ -387,24 +554,72 @@ class BlockParserImpl {
     return Status::OK();
   }
 
+  template <typename DataWriter>
+  const char* BulkFilter(DataWriter* data_writer, const char* data,
+                         const char* data_end) {
+    while (true) {
+      using WordType = BulkFilterType::WordType;
+
+      if (ARROW_PREDICT_FALSE(static_cast<size_t>(data_end - data) < sizeof(WordType))) {
+        if (ARROW_PREDICT_FALSE(data == data_end)) {
+          return nullptr;
+        }
+        return data;
+      }
+      WordType word;
+      memcpy(&word, data, sizeof(WordType));
+      if (bulk_filter_.Matches(word)) {
+        return data;
+      }
+      // No special chars
+      data_writer->PushFieldWord(word);
+      data += sizeof(WordType);
+    }
+  }
+
   template <typename SpecializedOptions, typename ValueDescWriter, typename DataWriter>
   Status ParseChunk(ValueDescWriter* values_writer, DataWriter* parsed_writer,
                     const char* data, const char* data_end, bool is_final,
                     int32_t rows_in_chunk, const char** out_data,
                     bool* finished_parsing) {
-    int32_t num_rows_deadline = batch_.num_rows_ + rows_in_chunk;
-
-    while (data < data_end && batch_.num_rows_ < num_rows_deadline) {
-      const char* line_end = data;
-      RETURN_NOT_OK(ParseLine<SpecializedOptions>(values_writer, parsed_writer, data,
-                                                  data_end, is_final, &line_end));
-      if (line_end == data) {
-        // Cannot parse any further
-        *finished_parsing = true;
-        break;
+    const int32_t start_num_rows = batch_.num_rows_;
+    const int32_t num_rows_deadline = batch_.num_rows_ + rows_in_chunk;
+
+    if (use_bulk_filter_) {
+      while (data < data_end && batch_.num_rows_ < num_rows_deadline) {
+        const char* line_end = data;
+        RETURN_NOT_OK((ParseLine<SpecializedOptions, true>(
+            values_writer, parsed_writer, data, data_end, is_final, &line_end)));
+        if (line_end == data) {
+          // Cannot parse any further
+          *finished_parsing = true;
+          break;
+        }
+        data = line_end;
       }
-      data = line_end;
+    } else {
+      while (data < data_end && batch_.num_rows_ < num_rows_deadline) {
+        const char* line_end = data;
+        RETURN_NOT_OK((ParseLine<SpecializedOptions, false>(
+            values_writer, parsed_writer, data, data_end, is_final, &line_end)));
+        if (line_end == data) {
+          // Cannot parse any further
+          *finished_parsing = true;
+          break;
+        }
+        data = line_end;
+      }
+    }
+
+    if (batch_.num_rows_ > start_num_rows && batch_.num_cols_ > 0) {
+      // Use bulk filter only if average value length is >= 10 bytes,
+      // as the bulk filter has a fixed cost that isn't compensated
+      // when values are too short.
+      const int64_t bulk_filter_threshold =
+          batch_.num_cols_ * (batch_.num_rows_ - start_num_rows) * 10;
+      use_bulk_filter_ = (data - *out_data) > bulk_filter_threshold;
     }
+
     // Append new buffers and update size
     std::shared_ptr<Buffer> values_buffer;
     values_writer->Finish(&values_buffer);
@@ -539,6 +754,9 @@ class BlockParserImpl {
   // The maximum number of rows to parse from a block
   int32_t max_num_rows_;
 
+  BulkFilterType bulk_filter_;
+  bool use_bulk_filter_ = false;
+
   // Unparsed data size
   int32_t values_size_;
   // Parsed data batch