You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/07/15 20:57:46 UTC

[GitHub] [arrow] bkietz commented on a change in pull request #10568: ARROW-11889: [C++] Add parallelism to streaming CSV reader

bkietz commented on a change in pull request #10568:
URL: https://github.com/apache/arrow/pull/10568#discussion_r669094247



##########
File path: cpp/src/arrow/csv/column_decoder.cc
##########
@@ -142,49 +64,38 @@ class ConcreteColumnDecoder : public ColumnDecoder {
 
   MemoryPool* pool_;
   int32_t col_index_;
-
-  std::vector<Future<std::shared_ptr<Array>>> chunks_;
-  int64_t num_chunks_;
-  int64_t next_chunk_;
-
-  std::mutex mutex_;
+  internal::Executor* executor_;
 };
 
 //////////////////////////////////////////////////////////////////////////
 // Null column decoder implementation (for a column not in the CSV file)
 
 class NullColumnDecoder : public ConcreteColumnDecoder {
  public:
-  explicit NullColumnDecoder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
-                             const std::shared_ptr<internal::TaskGroup>& task_group)
-      : ConcreteColumnDecoder(pool, task_group), type_(type) {}
+  explicit NullColumnDecoder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
+      : ConcreteColumnDecoder(pool), type_(type) {}
 
-  void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
+  Future<std::shared_ptr<Array>> Decode(
+      const std::shared_ptr<BlockParser>& parser) override;
 
  protected:
   std::shared_ptr<DataType> type() const override { return type_; }
 
   std::shared_ptr<DataType> type_;
 };
 
-void NullColumnDecoder::Insert(int64_t block_index,
-                               const std::shared_ptr<BlockParser>& parser) {
-  PrepareChunk(block_index);
-
+Future<std::shared_ptr<Array>> NullColumnDecoder::Decode(
+    const std::shared_ptr<BlockParser>& parser) {
   // Spawn a task that will build an array of nulls with the right DataType
   const int32_t num_rows = parser->num_rows();
   DCHECK_GE(num_rows, 0);
 
-  task_group_->Append([=]() -> Status {
-    std::unique_ptr<ArrayBuilder> builder;
-    RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
-    std::shared_ptr<Array> array;
-    RETURN_NOT_OK(builder->AppendNulls(num_rows));
-    RETURN_NOT_OK(builder->Finish(&array));
-
-    SetChunk(block_index, array);
-    return Status::OK();
-  });
+  std::unique_ptr<ArrayBuilder> builder;
+  RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
+  std::shared_ptr<Array> array;
+  RETURN_NOT_OK(builder->AppendNulls(num_rows));
+  RETURN_NOT_OK(builder->Finish(&array));
+  return Future<std::shared_ptr<Array>>::MakeFinished(std::move(array));

Review comment:
       ```suggestion
     DCHECK_GE(parser->num_rows(), 0);
     return MakeArrayOfNull(type_, parser->num_rows(), pool_);
   ```

##########
File path: cpp/src/arrow/csv/column_decoder.cc
##########
@@ -283,84 +188,62 @@ Result<std::shared_ptr<Array>> InferringColumnDecoder::RunInference(
     // (no one else should be updating converter_ concurrently)
     auto maybe_array = converter_->Convert(*parser, col_index_);
 
-    std::unique_lock<std::mutex> lock(mutex_);
     if (maybe_array.ok() || !infer_status_.can_loosen_type()) {
       // Conversion succeeded, or failed definitively
+      DCHECK(!type_frozen_);
+      type_frozen_ = true;
       return maybe_array;
     }
     // Conversion failed temporarily, try another type
     infer_status_.LoosenType(maybe_array.status());
-    RETURN_NOT_OK(UpdateType());
+    auto update_status = UpdateType();
+    if (!update_status.ok()) {
+      return update_status;
+    }
   }
 }
 
-void InferringColumnDecoder::Insert(int64_t block_index,
-                                    const std::shared_ptr<BlockParser>& parser) {
-  PrepareChunk(block_index);
-
+Future<std::shared_ptr<Array>> InferringColumnDecoder::Decode(
+    const std::shared_ptr<BlockParser>& parser) {
+  bool already_taken = first_inferrer_.fetch_or(1);
   // First block: run inference
-  if (block_index == 0) {
-    task_group_->Append([=]() -> Status {
-      auto maybe_array = RunInference(parser);
-
-      std::unique_lock<std::mutex> lock(mutex_);
-      DCHECK(!type_frozen_);
-      type_frozen_ = true;
-      SetChunkUnlocked(block_index, std::move(maybe_array));
-      return Status::OK();
-    });
-    return;
+  if (!already_taken) {
+    auto maybe_array = RunInference(parser);
+    first_inference_run_.MarkFinished();
+    return Future<std::shared_ptr<Array>>::MakeFinished(maybe_array);

Review comment:
       ```suggestion
       return Future<std::shared_ptr<Array>>::MakeFinished(std::move(maybe_array));
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -349,6 +350,182 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  // Represents the number of input bytes represented by this batch
+  // This will include bytes skipped when skipping rows after the header
+  int64_t bytes_processed;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
+};
+
+template <>
+struct IterationTraits<csv::DecodedBlock> {
+  static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
+  static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
+};
+
+namespace csv {
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
+    auto parser = std::make_shared<BlockParser>(
+        io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
+
+    std::shared_ptr<Buffer> straddling;
+    std::vector<util::string_view> views;
+    if (block.partial->size() != 0 || block.completion->size() != 0) {
+      if (block.partial->size() == 0) {
+        straddling = block.completion;
+      } else if (block.completion->size() == 0) {
+        straddling = block.partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            straddling,
+            ConcatenateBuffers({block.partial, block.completion}, io_context_.pool()));
+      }
+      views = {util::string_view(*straddling), util::string_view(*block.buffer)};
+    } else {
+      views = {util::string_view(*block.buffer)};
+    }
+    uint32_t parsed_size;
+    if (block.is_final) {
+      RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
+    } else {
+      RETURN_NOT_OK(parser->Parse(views, &parsed_size));
+    }
+    if (count_rows_) {
+      num_rows_seen_ += parser->num_rows();
+    }
+    RETURN_NOT_OK(block.consume_bytes(parsed_size));
+    return ParsedBlock{std::move(parser), block.block_index,
+                       static_cast<int64_t>(parsed_size) + block.bytes_skipped};
+  }
+
+ private:
+  io::IOContext io_context_;
+  ParseOptions parse_options_;
+  int num_csv_cols_;
+  bool count_rows_;
+  int num_rows_seen_ = 0;
+};
+
+class BlockDecodingOperator {
+ public:
+  Future<DecodedBlock> operator()(const ParsedBlock& block) {
+    DCHECK(!state_->column_decoders.empty());
+    std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
+    for (auto& decoder : state_->column_decoders) {
+      decoded_array_futs.push_back(decoder->Decode(block.parser));
+    }
+    auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
+    auto decoded_arrays_fut = All(decoded_array_futs);
+    auto state = state_;
+    return decoded_arrays_fut.Then(
+        [state, bytes_parsed_or_skipped](
+            const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays)
+            -> Result<DecodedBlock> {
+          ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
+                                internal::UnwrapOrRaise(maybe_decoded_arrays));
+
+          ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(decoded_arrays));

Review comment:
       ```suggestion
             ARROW_ASSIGN_OR_RAISE(auto batch,
                                   state->DecodedArraysToBatch(std::move(decoded_arrays)));
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -1139,8 +1177,9 @@ class CSVRowCounter : public ReaderMixin,
   }
 
   Future<int64_t> DoCount(const std::shared_ptr<CSVRowCounter>& self) {
-    // We must return a value instead of Status/Future<> to work with MakeMappedGenerator,
-    // and we must use a type with a valid end value to work with IterationEnd.
+    // We must return a value instead of Status/Future<> to work with
+    // MakeMappedGenerator, and we must use a type with a valid end value to work with

Review comment:
       ```suggestion
       // count_cb must return a value instead of Status/Future<> to work with
       // MakeMappedGenerator, and it must use a type with a valid end value to work with
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -642,263 +822,119 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {
 /////////////////////////////////////////////////////////////////////////
 // Base class for streaming readers
 
-class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
+class StreamingReaderImpl : public ReaderMixin,
+                            public csv::StreamingReader,
+                            public std::enable_shared_from_this<StreamingReaderImpl> {
  public:
-  BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
-                      std::shared_ptr<io::InputStream> input,
+  StreamingReaderImpl(io::IOContext io_context, std::shared_ptr<io::InputStream> input,
                       const ReadOptions& read_options, const ParseOptions& parse_options,
                       const ConvertOptions& convert_options, bool count_rows)
       : ReaderMixin(io_context, std::move(input), read_options, parse_options,
                     convert_options, count_rows),
-        cpu_executor_(cpu_executor) {}
-
-  virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
-
-  std::shared_ptr<Schema> schema() const override { return schema_; }
-
-  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
-    auto next_fut = ReadNextAsync();
-    auto next_result = next_fut.result();
-    return std::move(next_result).Value(batch);
-  }
-
- protected:
-  // Make column decoders from conversion schema
-  Status MakeColumnDecoders() {
-    for (const auto& column : conversion_schema_.columns) {
-      std::shared_ptr<ColumnDecoder> decoder;
-      if (column.is_missing) {
-        ARROW_ASSIGN_OR_RAISE(decoder, ColumnDecoder::MakeNull(io_context_.pool(),
-                                                               column.type, task_group_));
-      } else if (column.type != nullptr) {
-        ARROW_ASSIGN_OR_RAISE(
-            decoder, ColumnDecoder::Make(io_context_.pool(), column.type, column.index,
-                                         convert_options_, task_group_));
-      } else {
-        ARROW_ASSIGN_OR_RAISE(decoder,
-                              ColumnDecoder::Make(io_context_.pool(), column.index,
-                                                  convert_options_, task_group_));
-      }
-      column_decoders_.push_back(std::move(decoder));
-    }
-    return Status::OK();
-  }
+        bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
 
-  Result<int64_t> ParseAndInsert(const std::shared_ptr<Buffer>& partial,
-                                 const std::shared_ptr<Buffer>& completion,
-                                 const std::shared_ptr<Buffer>& block,
-                                 int64_t block_index, bool is_final) {
-    ARROW_ASSIGN_OR_RAISE(auto result,
-                          Parse(partial, completion, block, block_index, is_final));
-    RETURN_NOT_OK(ProcessData(result.parser, block_index));
-    return result.parsed_bytes;
-  }
-
-  // Trigger conversion of parsed block data
-  Status ProcessData(const std::shared_ptr<BlockParser>& parser, int64_t block_index) {
-    for (auto& decoder : column_decoders_) {
-      decoder->Insert(block_index, parser);
-    }
-    return Status::OK();
-  }
-
-  Result<std::shared_ptr<RecordBatch>> DecodeNextBatch() {
-    DCHECK(!column_decoders_.empty());
-    ArrayVector arrays;
-    arrays.reserve(column_decoders_.size());
-    Status st;
-    for (auto& decoder : column_decoders_) {
-      auto maybe_array = decoder->NextChunk();
-      if (!maybe_array.ok()) {
-        // If there's an error, still fetch results from other decoders to
-        // keep them in sync.
-        st &= maybe_array.status();
-      } else {
-        arrays.push_back(*std::move(maybe_array));
-      }
-    }
-    RETURN_NOT_OK(st);
-    DCHECK_EQ(arrays.size(), column_decoders_.size());
-    const bool is_null = (arrays[0] == nullptr);
-#ifndef NDEBUG
-    for (const auto& array : arrays) {
-      DCHECK_EQ(array == nullptr, is_null);
-    }
-#endif
-    if (is_null) {
-      eof_ = true;
-      return nullptr;
-    }
-
-    if (schema_ == nullptr) {
-      FieldVector fields(arrays.size());
-      for (size_t i = 0; i < arrays.size(); ++i) {
-        fields[i] = field(conversion_schema_.columns[i].name, arrays[i]->type());
-      }
-      schema_ = arrow::schema(std::move(fields));
-    }
-    const auto n_rows = arrays[0]->length();
-    return RecordBatch::Make(schema_, n_rows, std::move(arrays));
-  }
-
-  // Column decoders (in ConversionSchema order)
-  std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
-  std::shared_ptr<Schema> schema_;
-  std::shared_ptr<RecordBatch> pending_batch_;
-  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
-  Executor* cpu_executor_;
-  bool eof_ = false;
-};
-
-/////////////////////////////////////////////////////////////////////////
-// Serial StreamingReader implementation
-
-class SerialStreamingReader : public BaseStreamingReader,
-                              public std::enable_shared_from_this<SerialStreamingReader> {
- public:
-  using BaseStreamingReader::BaseStreamingReader;
-
-  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
+  Future<> Init(Executor* cpu_executor) {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, read_options_.block_size));
 
     // TODO Consider exposing readahead as a read option (ARROW-12090)
     ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
                                                               io_context_.executor()));
 
-    auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
+    auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor);
 
-    buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
-    task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
+    auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
 
+    int max_readahead = cpu_executor->GetCapacity();
     auto self = shared_from_this();
-    // Read schema from first batch
-    return ReadNextAsync(true).Then(
-        [self](const std::shared_ptr<RecordBatch>& first_batch)
-            -> Result<std::shared_ptr<csv::StreamingReader>> {
-          self->pending_batch_ = first_batch;
-          DCHECK_NE(self->schema_, nullptr);
-          return self;
-        });
-  }
 
-  Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
-    auto maybe_batch = DecodeNextBatch();
-    if (schema_ == nullptr && maybe_batch.ok()) {
-      schema_ = (*maybe_batch)->schema();
-    }
-    return maybe_batch;
+    return buffer_generator().Then([self, buffer_generator, max_readahead](
+                                       const std::shared_ptr<Buffer>& first_buffer) {
+      return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead);
+    });
   }
 
-  Future<std::shared_ptr<RecordBatch>> DoReadNext(
-      std::shared_ptr<SerialStreamingReader> self) {
-    auto batch = std::move(pending_batch_);
-    if (batch != nullptr) {
-      return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
-    }
+  std::shared_ptr<Schema> schema() const override { return schema_; }
 
-    if (!source_eof_) {
-      return block_generator_()
-          .Then([self](const CSVBlock& maybe_block) -> Status {
-            if (!IsIterationEnd(maybe_block)) {
-              self->bytes_parsed_ += maybe_block.bytes_skipped;
-              self->last_block_index_ = maybe_block.block_index;
-              auto maybe_parsed = self->ParseAndInsert(
-                  maybe_block.partial, maybe_block.completion, maybe_block.buffer,
-                  maybe_block.block_index, maybe_block.is_final);
-              if (!maybe_parsed.ok()) {
-                // Parse error => bail out
-                self->eof_ = true;
-                return maybe_parsed.status();
-              }
-              self->bytes_parsed_ += *maybe_parsed;
-              RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
-            } else {
-              self->source_eof_ = true;
-              for (auto& decoder : self->column_decoders_) {
-                decoder->SetEOF(self->last_block_index_ + 1);
-              }
-            }
-            return Status::OK();
-          })
-          .Then([self]() -> Result<std::shared_ptr<RecordBatch>> {
-            return self->DecodeBatchAndUpdateSchema();
-          });
-    }
-    return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
-        DecodeBatchAndUpdateSchema());
-  }
+  int64_t bytes_read() const override { return bytes_decoded_->load(); }
 
-  Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
-      std::shared_ptr<SerialStreamingReader> self, bool internal_read) {
-    return DoReadNext(self).Then(
-        [self, internal_read](const std::shared_ptr<RecordBatch>& batch) {
-          if (batch != nullptr && batch->num_rows() == 0) {
-            return self->ReadNextSkippingEmpty(self, internal_read);
-          }
-          if (!internal_read) {
-            self->bytes_decoded_ += self->bytes_parsed_;
-            self->bytes_parsed_ = 0;
-          }
-          return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
-        });
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    auto next_fut = ReadNextAsync();
+    auto next_result = next_fut.result();
+    return std::move(next_result).Value(batch);
   }
 
   Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
-    return ReadNextAsync(false);
-  };
-
-  int64_t bytes_read() const override { return bytes_decoded_; }
+    return record_batch_gen_();
+  }
 
  protected:
-  Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
-    return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
-      if (first_buffer == nullptr) {
-        return Status::Invalid("Empty CSV file");
-      }
-      auto own_first_buffer = first_buffer;
-      auto start = own_first_buffer->data();
-      RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
-      self->bytes_decoded_ = own_first_buffer->data() - start;
-      RETURN_NOT_OK(self->MakeColumnDecoders());
-
-      self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
-          std::move(self->buffer_generator_), MakeChunker(self->parse_options_),
-          std::move(own_first_buffer), self->read_options_.skip_rows_after_names);
-      return Status::OK();
+  Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
+                                AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
+                                int max_readahead) {
+    if (first_buffer == nullptr) {
+      return Status::Invalid("Empty CSV file");
+    }
+
+    std::shared_ptr<Buffer> after_header;
+    ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
+                          ProcessHeader(first_buffer, &after_header));
+    bytes_decoded_->fetch_add(header_bytes_consumed);

Review comment:
       Nit: could you space these out a little?

##########
File path: cpp/src/arrow/csv/column_decoder.cc
##########
@@ -283,84 +188,62 @@ Result<std::shared_ptr<Array>> InferringColumnDecoder::RunInference(
     // (no one else should be updating converter_ concurrently)
     auto maybe_array = converter_->Convert(*parser, col_index_);
 
-    std::unique_lock<std::mutex> lock(mutex_);
     if (maybe_array.ok() || !infer_status_.can_loosen_type()) {
       // Conversion succeeded, or failed definitively
+      DCHECK(!type_frozen_);
+      type_frozen_ = true;
       return maybe_array;
     }
     // Conversion failed temporarily, try another type
     infer_status_.LoosenType(maybe_array.status());
-    RETURN_NOT_OK(UpdateType());
+    auto update_status = UpdateType();
+    if (!update_status.ok()) {
+      return update_status;
+    }
   }
 }
 
-void InferringColumnDecoder::Insert(int64_t block_index,
-                                    const std::shared_ptr<BlockParser>& parser) {
-  PrepareChunk(block_index);
-
+Future<std::shared_ptr<Array>> InferringColumnDecoder::Decode(
+    const std::shared_ptr<BlockParser>& parser) {
+  bool already_taken = first_inferrer_.fetch_or(1);
   // First block: run inference
-  if (block_index == 0) {
-    task_group_->Append([=]() -> Status {
-      auto maybe_array = RunInference(parser);
-
-      std::unique_lock<std::mutex> lock(mutex_);
-      DCHECK(!type_frozen_);
-      type_frozen_ = true;
-      SetChunkUnlocked(block_index, std::move(maybe_array));
-      return Status::OK();
-    });
-    return;
+  if (!already_taken) {
+    auto maybe_array = RunInference(parser);
+    first_inference_run_.MarkFinished();
+    return Future<std::shared_ptr<Array>>::MakeFinished(maybe_array);
   }
 
   // Non-first block: wait for inference to finish on first block now,
   // without blocking a TaskGroup thread.
-  {
-    std::unique_lock<std::mutex> lock(mutex_);
-    PrepareChunkUnlocked(0);
-    WaitForChunkUnlocked(0);
-    if (!chunks_[0].status().ok()) {
-      // Failed converting first chunk: bail out by marking EOF,
-      // because we can't decide a type for the other chunks.
-      SetChunkUnlocked(block_index, std::shared_ptr<Array>());
-    }
+  return first_inference_run_.Then([this, parser] {
     DCHECK(type_frozen_);
-  }
-
-  // Then use the inferred type to convert this block.
-  task_group_->Append([=]() -> Status {
     auto maybe_array = converter_->Convert(*parser, col_index_);
-
-    SetChunk(block_index, std::move(maybe_array));
-    return Status::OK();
+    return maybe_array;

Review comment:
       ```suggestion
       return converter_->Convert(*parser, col_index_);
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -349,6 +350,182 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  // Represents the number of input bytes represented by this batch
+  // This will include bytes skipped when skipping rows after the header
+  int64_t bytes_processed;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
+};
+
+template <>
+struct IterationTraits<csv::DecodedBlock> {
+  static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
+  static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
+};
+
+namespace csv {
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
+    auto parser = std::make_shared<BlockParser>(
+        io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
+
+    std::shared_ptr<Buffer> straddling;
+    std::vector<util::string_view> views;
+    if (block.partial->size() != 0 || block.completion->size() != 0) {
+      if (block.partial->size() == 0) {
+        straddling = block.completion;
+      } else if (block.completion->size() == 0) {
+        straddling = block.partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            straddling,
+            ConcatenateBuffers({block.partial, block.completion}, io_context_.pool()));
+      }
+      views = {util::string_view(*straddling), util::string_view(*block.buffer)};
+    } else {
+      views = {util::string_view(*block.buffer)};
+    }
+    uint32_t parsed_size;
+    if (block.is_final) {
+      RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
+    } else {
+      RETURN_NOT_OK(parser->Parse(views, &parsed_size));
+    }
+    if (count_rows_) {
+      num_rows_seen_ += parser->num_rows();
+    }
+    RETURN_NOT_OK(block.consume_bytes(parsed_size));
+    return ParsedBlock{std::move(parser), block.block_index,
+                       static_cast<int64_t>(parsed_size) + block.bytes_skipped};
+  }
+
+ private:
+  io::IOContext io_context_;
+  ParseOptions parse_options_;
+  int num_csv_cols_;
+  bool count_rows_;
+  int num_rows_seen_ = 0;
+};
+
+class BlockDecodingOperator {
+ public:
+  Future<DecodedBlock> operator()(const ParsedBlock& block) {
+    DCHECK(!state_->column_decoders.empty());
+    std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
+    for (auto& decoder : state_->column_decoders) {
+      decoded_array_futs.push_back(decoder->Decode(block.parser));
+    }
+    auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
+    auto decoded_arrays_fut = All(decoded_array_futs);
+    auto state = state_;
+    return decoded_arrays_fut.Then(
+        [state, bytes_parsed_or_skipped](
+            const std::vector<Result<std::shared_ptr<Array>>>& maybe_decoded_arrays)
+            -> Result<DecodedBlock> {
+          ARROW_ASSIGN_OR_RAISE(auto decoded_arrays,
+                                internal::UnwrapOrRaise(maybe_decoded_arrays));
+
+          ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(decoded_arrays));
+          return DecodedBlock{std::move(batch), bytes_parsed_or_skipped};
+        });
+  }
+
+  static Result<BlockDecodingOperator> Make(io::IOContext io_context,
+                                            ConvertOptions convert_options,
+                                            ConversionSchema conversion_schema) {
+    BlockDecodingOperator op(std::move(io_context), std::move(convert_options),
+                             std::move(conversion_schema));
+    RETURN_NOT_OK(op.state_->MakeColumnDecoders(io_context));
+    return op;
+  }
+
+ private:
+  BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options,
+                        ConversionSchema conversion_schema)
+      : state_(std::make_shared<State>(std::move(io_context), std::move(convert_options),
+                                       std::move(conversion_schema))) {}
+
+  struct State {
+    State(io::IOContext io_context, ConvertOptions convert_options,
+          ConversionSchema conversion_schema)
+        : convert_options(std::move(convert_options)),
+          conversion_schema(std::move(conversion_schema)) {}
+
+    Result<std::shared_ptr<RecordBatch>> DecodedArraysToBatch(
+        std::vector<std::shared_ptr<Array>>& arrays) {

Review comment:
       ```suggestion
           std::vector<std::shared_ptr<Array>> arrays) {
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -349,6 +350,182 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  // Represents the number of input bytes represented by this batch
+  // This will include bytes skipped when skipping rows after the header
+  int64_t bytes_processed;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
+};
+
+template <>
+struct IterationTraits<csv::DecodedBlock> {
+  static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
+  static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
+};
+
+namespace csv {
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that columns
+// can be individually scanned

Review comment:
       ```suggestion
   // A function object that takes in a buffer of CSV data and returns a parsed batch of CSV
   // data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator.
   // The parsed batch contains a list of offsets for each of the columns so that columns
   // can be individually scanned
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -349,6 +350,182 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  // Represents the number of input bytes represented by this batch
+  // This will include bytes skipped when skipping rows after the header
+  int64_t bytes_processed;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
+};
+
+template <>
+struct IterationTraits<csv::DecodedBlock> {
+  static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
+  static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
+};
+
+namespace csv {
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
+    auto parser = std::make_shared<BlockParser>(
+        io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
+
+    std::shared_ptr<Buffer> straddling;
+    std::vector<util::string_view> views;
+    if (block.partial->size() != 0 || block.completion->size() != 0) {
+      if (block.partial->size() == 0) {
+        straddling = block.completion;
+      } else if (block.completion->size() == 0) {
+        straddling = block.partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            straddling,
+            ConcatenateBuffers({block.partial, block.completion}, io_context_.pool()));
+      }
+      views = {util::string_view(*straddling), util::string_view(*block.buffer)};
+    } else {
+      views = {util::string_view(*block.buffer)};
+    }
+    uint32_t parsed_size;
+    if (block.is_final) {
+      RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
+    } else {
+      RETURN_NOT_OK(parser->Parse(views, &parsed_size));
+    }
+    if (count_rows_) {
+      num_rows_seen_ += parser->num_rows();
+    }
+    RETURN_NOT_OK(block.consume_bytes(parsed_size));
+    return ParsedBlock{std::move(parser), block.block_index,
+                       static_cast<int64_t>(parsed_size) + block.bytes_skipped};
+  }
+
+ private:
+  io::IOContext io_context_;
+  ParseOptions parse_options_;
+  int num_csv_cols_;
+  bool count_rows_;
+  int num_rows_seen_ = 0;
+};
+
+class BlockDecodingOperator {

Review comment:
       ```suggestion
   // A function object that takes in parsed batch of CSV data and decodes it to an arrow
   // record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator.
   class BlockDecodingOperator {
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -349,6 +350,182 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  // Represents the number of input bytes represented by this batch
+  // This will include bytes skipped when skipping rows after the header
+  int64_t bytes_processed;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
+};
+
+template <>
+struct IterationTraits<csv::DecodedBlock> {
+  static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
+  static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
+};
+
+namespace csv {
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();

Review comment:
       ```suggestion
       constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
   ```

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -349,6 +350,182 @@ class ThreadedBlockReader : public BlockReader {
   }
 };
 
+struct ParsedBlock {
+  std::shared_ptr<BlockParser> parser;
+  int64_t block_index;
+  int64_t bytes_parsed_or_skipped;
+};
+
+struct DecodedBlock {
+  std::shared_ptr<RecordBatch> record_batch;
+  // Represents the number of input bytes represented by this batch
+  // This will include bytes skipped when skipping rows after the header
+  int64_t bytes_processed;
+};
+
+}  // namespace
+
+}  // namespace csv
+
+template <>
+struct IterationTraits<csv::ParsedBlock> {
+  static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; }
+  static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; }
+};
+
+template <>
+struct IterationTraits<csv::DecodedBlock> {
+  static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; }
+  static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; }
+};
+
+namespace csv {
+namespace {
+
+// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data.
+// The parsed batch contains a list of offsets for each of the columns so that columns
+// can be individually scanned
+//
+// This operator is not re-entrant
+class BlockParsingOperator {
+ public:
+  BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options,
+                       int num_csv_cols, bool count_rows)
+      : io_context_(io_context),
+        parse_options_(parse_options),
+        num_csv_cols_(num_csv_cols),
+        count_rows_(count_rows) {}
+
+  Result<ParsedBlock> operator()(const CSVBlock& block) {
+    static constexpr int32_t max_num_rows = std::numeric_limits<int32_t>::max();
+    auto parser = std::make_shared<BlockParser>(
+        io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows);
+
+    std::shared_ptr<Buffer> straddling;
+    std::vector<util::string_view> views;
+    if (block.partial->size() != 0 || block.completion->size() != 0) {
+      if (block.partial->size() == 0) {
+        straddling = block.completion;
+      } else if (block.completion->size() == 0) {
+        straddling = block.partial;
+      } else {
+        ARROW_ASSIGN_OR_RAISE(
+            straddling,
+            ConcatenateBuffers({block.partial, block.completion}, io_context_.pool()));
+      }
+      views = {util::string_view(*straddling), util::string_view(*block.buffer)};
+    } else {
+      views = {util::string_view(*block.buffer)};
+    }
+    uint32_t parsed_size;
+    if (block.is_final) {
+      RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size));
+    } else {
+      RETURN_NOT_OK(parser->Parse(views, &parsed_size));
+    }
+    if (count_rows_) {
+      num_rows_seen_ += parser->num_rows();
+    }
+    RETURN_NOT_OK(block.consume_bytes(parsed_size));
+    return ParsedBlock{std::move(parser), block.block_index,
+                       static_cast<int64_t>(parsed_size) + block.bytes_skipped};
+  }
+
+ private:
+  io::IOContext io_context_;
+  ParseOptions parse_options_;
+  int num_csv_cols_;
+  bool count_rows_;
+  int num_rows_seen_ = 0;
+};
+
+class BlockDecodingOperator {
+ public:
+  Future<DecodedBlock> operator()(const ParsedBlock& block) {
+    DCHECK(!state_->column_decoders.empty());
+    std::vector<Future<std::shared_ptr<Array>>> decoded_array_futs;
+    for (auto& decoder : state_->column_decoders) {
+      decoded_array_futs.push_back(decoder->Decode(block.parser));
+    }
+    auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped;
+    auto decoded_arrays_fut = All(decoded_array_futs);

Review comment:
       ```suggestion
       auto decoded_arrays_fut = All(std::move(decoded_array_futs));
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org