You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/19 22:24:14 UTC

[GitHub] [arrow] marsupialtail opened a new pull request, #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

marsupialtail opened a new pull request, #13931:
URL: https://github.com/apache/arrow/pull/13931

   Draft.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968570337


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -24,6 +24,7 @@
 
 #include "arrow/io/type_fwd.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/async_generator.h"

Review Comment:
   This is a too heavy inclusion, should instead just use the corresponding `std::function<...>` definition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r950606781


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -545,9 +546,10 @@ class BlockDecodingOperator {
 /////////////////////////////////////////////////////////////////////////
 // Base class for common functionality
 
+template <typename T = std::shared_ptr<io::InputStream>>

Review Comment:
   I'm not sure this makes sense as a default parameter.  A reader probably wouldn't intuitively guess this.  Let's just force it to be specified.



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,181 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+
+
+// this is just like a MapGenerator but the map fun returns a thing instead of a future
+template <typename T, typename ApplyFn,
+          typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+          typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn apply_fun, internal::Executor* cpu_exec) {

Review Comment:
   I think this should actually go in `async_generator.h`.  I think I told you otherwise once before but I was thinking it was going to be more tailored to the CSV reader.  This seems pretty generic.  Also, it should ideally be independently unit tested with a few tests.



##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -184,16 +186,45 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
   auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
 #endif
   ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
-
-  ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
   const auto& path = source.path();
-  ARROW_ASSIGN_OR_RAISE(
+
+  
+  auto actual_compression = Compression::type::UNCOMPRESSED;
+    // Guess compression from file extension
+  auto extension = fs::internal::GetAbstractPathExtension(path);
+  if (extension == "gz") {
+    actual_compression = Compression::type::GZIP;
+  } else {
+    auto maybe_compression = util::Codec::GetCompressionType(extension);
+    if (maybe_compression.ok()) {
+      ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
+    }
+  }
+  
+  Future<std::shared_ptr<csv::StreamingReader>> reader_fut;
+
+  if (actual_compression == Compression::type::UNCOMPRESSED) {
+    ARROW_ASSIGN_OR_RAISE(auto input, source.Open() )
+    reader_fut = DeferNotOk(input->io_context().executor()->Submit(
+      [=]() -> Future<std::shared_ptr<csv::StreamingReader>> {
+        ARROW_ASSIGN_OR_RAISE(auto temp_first_block, input->ReadAt(0, reader_options.block_size));
+        RETURN_NOT_OK(input->Seek(0));

Review Comment:
   Can you create a follow-up JIRA to remove this double-read?  I don't think we need to worry about it now but I'd like to get away from it at some point.



##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,8 @@ ARROW_EXPORT
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size);
 
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t block_size);

Review Comment:
   We will want to document this to point out that it is async reentrant (e.g. can be read with a readahead generator).  Or, maybe this function could apply the readahead generator also?  So it would be:
   
   ```
    Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t block_size, int readahead);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1259697812

   rebased on master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1242763544

   I think the test errors are spurious. Should be ready to be merged.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968572712


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -24,6 +24,7 @@
 
 #include "arrow/io/type_fwd.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/async_generator.h"

Review Comment:
   I actually use a new generator that I implemented in async_generator.h



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r959949490


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,8 @@ ARROW_EXPORT
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size);
 
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t block_size);

Review Comment:
   Added a comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail closed pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail closed pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3
URL: https://github.com/apache/arrow/pull/13931


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955602646


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,

Review Comment:
   Ah thanks for pointing this out. I forgot to change this!! :-P 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955629720


##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing instead of a
+/// future
+template <typename T, typename ApplyFn,
+          typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+          typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn apply_fun,
+                                     internal::Executor* cpu_exec) {
+  struct State {
+    explicit State(AsyncGenerator<T> source_gen_, ApplyFn apply_fun_,
+                   internal::Executor* cpu_exec_)
+        : source_gen(std::move(source_gen_)),
+          apply_fun(std::move(apply_fun_)),
+          cpu_exec(cpu_exec_),
+          finished(false) {}
+
+    AsyncGenerator<T> source_gen;
+    ApplyFn apply_fun;
+    internal::Executor* cpu_exec;
+    bool finished;
+  };
+
+  auto state =
+      std::make_shared<State>(std::move(source_gen), std::move(apply_fun), cpu_exec);
+  return [state]() {
+    CallbackOptions options;
+    options.executor = state->cpu_exec;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source_gen().Then(
+        [state](const T& next) -> Result<V> {
+          if (IsIterationEnd(next)) {
+            return IterationTraits<V>::End();
+          } else {
+            auto value = state->apply_fun(next);
+            if (!value.ok()) {
+              return Status::NotImplemented("not implemented");
+            } else {
+              return value.ValueOrDie();
+            }

Review Comment:
   As opposed to just finishing the generator? It needs to return a not ok status here, the error message should be fleshed out more. If it simply finishes instead of returning a not ok, some of the tests will fail. e.g. if you read invalid csv, the parse task would return a not ok status that needs to be reflected up this generator to result in a status not ok for dataset inspect or the like. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r950904697


##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -184,16 +186,45 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
   auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
 #endif
   ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
-
-  ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
   const auto& path = source.path();
-  ARROW_ASSIGN_OR_RAISE(
+
+  
+  auto actual_compression = Compression::type::UNCOMPRESSED;
+    // Guess compression from file extension
+  auto extension = fs::internal::GetAbstractPathExtension(path);
+  if (extension == "gz") {
+    actual_compression = Compression::type::GZIP;
+  } else {
+    auto maybe_compression = util::Codec::GetCompressionType(extension);
+    if (maybe_compression.ok()) {
+      ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
+    }
+  }
+  
+  Future<std::shared_ptr<csv::StreamingReader>> reader_fut;
+
+  if (actual_compression == Compression::type::UNCOMPRESSED) {
+    ARROW_ASSIGN_OR_RAISE(auto input, source.Open() )
+    reader_fut = DeferNotOk(input->io_context().executor()->Submit(
+      [=]() -> Future<std::shared_ptr<csv::StreamingReader>> {
+        ARROW_ASSIGN_OR_RAISE(auto temp_first_block, input->ReadAt(0, reader_options.block_size));
+        RETURN_NOT_OK(input->Seek(0));

Review Comment:
   Yes i plan to do that. This double read is very annoying. The whole logic around what to do with the first block needs to be streamlined and optimized. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1242105879

   OK this won't be supported in fast CSV reader then


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r950904667


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,8 @@ ARROW_EXPORT
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size);
 
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t block_size);

Review Comment:
   I think it makes more sense to separate out the readahead generator if you are going to do this. I personally think a lot of interfaces in Arrow already have too many flags and it's not very easy for a new person to figure out what are the good flags to use. I don't think having this generator itself do the readahead will lead to too many performance benefits either?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r958854671


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -1166,10 +1313,26 @@ Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
   });
 }
 
+Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(

Review Comment:
   I think it's ok :-) these are all internal APIs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r977971724


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -887,30 +872,37 @@ class StreamingReaderImpl : public ReaderMixin,
  protected:
   Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
                                 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
-                                int max_readahead) {
+                                int max_readahead, Executor* cpu_executor) {
     if (first_buffer == nullptr) {
       return Status::Invalid("Empty CSV file");
     }
 
     std::shared_ptr<Buffer> after_header;
-    ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
-                          ProcessHeader(first_buffer, &after_header));
+    ARROW_ASSIGN_OR_RAISE(
+        auto header_bytes_consumed,
+        ReaderMixin<InputType>::ProcessHeader(first_buffer, &after_header));
     bytes_decoded_->fetch_add(header_bytes_consumed);
 
-    auto parser_op =
-        BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
+    auto parser_op = BlockParsingOperator(
+        ReaderMixin<InputType>::io_context_, ReaderMixin<InputType>::parse_options_,
+        ReaderMixin<InputType>::num_csv_cols_, ReaderMixin<InputType>::num_rows_seen_);

Review Comment:
   Are these sorts of changes (where you are adding `ReaderMixin<InputType>::` needed to resolve some kind of ambiguity?  I don't know that we have any specific stance in the style guide so I'm not sure it's a problem but I'm curious why you made the change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1256762625

   Need to keep up to date with the times (C++17)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968573914


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,9 @@ ARROW_EXPORT
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size);
 
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(

Review Comment:
   If this is beneficial for reading from S3, should it perhaps be called in S3 and GCS filesystem implementations of `OpenInputStream` instead of just the CSV datasets reader?
   (does that sound doable/reasonable?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968579855


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,9 @@ ARROW_EXPORT
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size);
 
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(

Review Comment:
   This sounds like a separate PR. 
   
   I think it is only beneficial if the downstream processing logic can keep up with the increased IO rate. That said I think it probably is the common case for reading from S3/GCS. I don't know what `OpenInputStream` in the filesystem interface is used for so I can't comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955653709


##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing instead of a
+/// future

Review Comment:
   That is right. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955628813


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,
+                       public csv::StreamingReader,
+                       public std::enable_shared_from_this<TonyReaderImpl> {
+ public:
+  TonyReaderImpl(io::IOContext io_context, std::shared_ptr<io::RandomAccessFile> input,
+                 const ReadOptions& read_options, const ParseOptions& parse_options,
+                 const ConvertOptions& convert_options, bool count_rows)
+      : ReaderMixin(io_context, std::move(input), read_options, parse_options,
+                    convert_options, count_rows),
+        bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
+
+  Future<> Init(Executor* cpu_executor) {
+    ARROW_ASSIGN_OR_RAISE(
+        AsyncGenerator<std::shared_ptr<Buffer>> ifile_gen,
+        io::MakeRandomAccessFileGenerator(input_, read_options_.block_size));
+
+    // TODO Consider exposing readahead as a read option (ARROW-12090)
+    auto prefetch_gen =
+        MakeReadaheadGenerator(ifile_gen, io_context_.executor()->GetCapacity());
+
+    auto transferred_it = MakeTransferredGenerator(prefetch_gen, cpu_executor);
+
+    auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
+
+    int max_readahead = cpu_executor->GetCapacity();
+    auto self = shared_from_this();
+
+    return buffer_generator().Then([self, buffer_generator, max_readahead, cpu_executor](
+                                       const std::shared_ptr<Buffer>& first_buffer) {
+      return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead,
+                                        cpu_executor);
+    });
+  }

Review Comment:
   The hope is that I will keep improving this TonyReaderImpl (or whatever I change the name to) to include more and more optimizations in subsequent PRs. E.g. parallel decode. So the amount of duplicated code is hopefully a temporary state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1228976834

   failed CI has nothing to do with this PR. I am going to wait for this one to merge before proceeding with next PR that streamlines logic around first block and parallel decode. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r956240998


##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing instead of a
+/// future
+template <typename T, typename ApplyFn,
+          typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+          typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn apply_fun,
+                                     internal::Executor* cpu_exec) {
+  struct State {
+    explicit State(AsyncGenerator<T> source_gen_, ApplyFn apply_fun_,
+                   internal::Executor* cpu_exec_)
+        : source_gen(std::move(source_gen_)),
+          apply_fun(std::move(apply_fun_)),
+          cpu_exec(cpu_exec_),
+          finished(false) {}
+
+    AsyncGenerator<T> source_gen;
+    ApplyFn apply_fun;
+    internal::Executor* cpu_exec;
+    bool finished;
+  };
+
+  auto state =
+      std::make_shared<State>(std::move(source_gen), std::move(apply_fun), cpu_exec);
+  return [state]() {
+    CallbackOptions options;
+    options.executor = state->cpu_exec;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source_gen().Then(
+        [state](const T& next) -> Result<V> {
+          if (IsIterationEnd(next)) {
+            return IterationTraits<V>::End();
+          } else {
+            auto value = state->apply_fun(next);
+            if (!value.ok()) {
+              return Status::NotImplemented("not implemented");
+            } else {
+              return value.ValueOrDie();
+            }

Review Comment:
   If the return value of `state->apply_fun(next)` is a not ok status then `return state->apply_fun(next)` will return a not ok status right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1242146952

   I agree that we should leave it for now.  In the future we should still be able to do parallel I/O, we would just need to have a serial compute path on top of it.  I suppose we could do something like `RandomAccessFile -> Generator<Buffer> -> Readahead -> Iterator<Buffer> -> InputStream` :laughing: but maybe we could find some solution with fewer layers too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968581709


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -24,6 +24,7 @@
 
 #include "arrow/io/type_fwd.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/async_generator.h"

Review Comment:
   Ah I see



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1261163907

   Messed up the rebase, made a new branch: https://github.com/apache/arrow/pull/14269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r966382942


##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,47 @@ Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing instead of a
+/// future. Then we will launch each map fun as an independent task, instead of piggy
+/// backing it to the future from the source.

Review Comment:
   ```suggestion
   /// \brief Similar to MapGenerator but applies the map function in a new thread task
   /// This is similar to combining a map generator and transfer generator but the former
   /// would not be able to guarantee the map task runs on a new thread.
   ```



##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -184,27 +185,60 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
   auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
 #endif
   ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
-
-  ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
   const auto& path = source.path();
-  ARROW_ASSIGN_OR_RAISE(
-      input, io::BufferedInputStream::Create(reader_options.block_size,
-                                             default_memory_pool(), std::move(input)));
+
+  auto actual_compression = Compression::type::UNCOMPRESSED;
+  // Guess compression from file extension
+  auto extension = fs::internal::GetAbstractPathExtension(path);
+  if (extension == "gz") {
+    actual_compression = Compression::type::GZIP;
+  } else {
+    auto maybe_compression = util::Codec::GetCompressionType(extension);
+    if (maybe_compression.ok()) {
+      ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
+    }
+  }
+
+  Future<std::shared_ptr<csv::StreamingReader>> reader_fut;
+
+  if (actual_compression == Compression::type::UNCOMPRESSED) {

Review Comment:
   ```suggestion
     // If the file is uncompressed we open the reader with a RandomAccessFile which will
     // be capable of reading the file in parallel.  If the file is compressed we must use an
     // input stream and will be read sequentially.
     if (actual_compression == Compression::type::UNCOMPRESSED) {
   ```



##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -138,6 +136,44 @@ Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
   return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream, block_size));
 }
 
+// this is async re-entrant and can be used with a Readahead Generator
+// perhaps you should only use it with a Readahead Generator, otherwise there
+// is no point in using this, just use MakeInputStreamIterator instead.
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(
+    std::shared_ptr<RandomAccessFile> file, int64_t block_size) {
+  struct State {
+    explicit State(std::shared_ptr<RandomAccessFile> file_, int64_t block_size_)
+        : file(std::move(file_)), block_size(block_size_), position(0) {}
+
+    Status Init() {
+      RETURN_NOT_OK(file->Seek(0));
+      ARROW_ASSIGN_OR_RAISE(total_size, file->GetSize());
+      return Status::OK();
+    }
+
+    std::shared_ptr<RandomAccessFile> file;
+    int64_t block_size;
+    int64_t total_size;
+    std::atomic<int64_t> position;
+  };
+
+  auto state = std::make_shared<State>(std::move(file), block_size);
+  RETURN_NOT_OK(state->Init());
+  return [state]() {
+    auto pos = state->position.fetch_add(state->block_size);
+    if (pos >= state->total_size) {
+      return AsyncGeneratorEnd<std::shared_ptr<Buffer>>();
+    }
+    // idx is guaranteed to be smaller than total size, but you might not be able to read
+    // a full block

Review Comment:
   ```suggestion
       // pos is guaranteed to be smaller than total size, but you might not be able to read
       // a full block
   ```



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,148 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968574445


##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -24,6 +24,7 @@
 
 #include "arrow/io/type_fwd.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/async_generator.h"

Review Comment:
   Not in this `.h`, only in the `.cc`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] lidavidm commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
lidavidm commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1242089619

   @marsupialtail from a quick scan - yeah, input transcoding would disable parallelism just like compression. I'd probably not worry about it: it already means we have to call back into Python to read data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1246302510

   Should be good now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1242077237

   @lidavidm what do you think I should do with StreamWrapFunc?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
pitrou commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968572339


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,145 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+class ParallelStreamingReaderImpl

Review Comment:
   I'm not found of all the duplication here. How about having `StreamingReaderImpl` take a buffer-async-generator instead?



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,145 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+class ParallelStreamingReaderImpl

Review Comment:
   I'm not fond of all the duplication here. How about having `StreamingReaderImpl` take a buffer-async-generator instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r968604626


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,145 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+class ParallelStreamingReaderImpl

Review Comment:
   This code duplication is admittedly not ideal. However it's not just about reading faster through a new RandomAccessFile generator, but also parsing in parallel, and (in the future) decoding in parallel, which will introduce more changes to the StreamingReaderImpl. 
   
   After I finish the decoder in parallel in a new PR perhaps then it makes sense to reduce the deduplication. Or perhaps I can just add it to this PR, though I think this PR has enough value to be merged by itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r970319958


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,145 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+class ParallelStreamingReaderImpl

Review Comment:
   This has been addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1221134659

   https://issues.apache.org/jira/browse/ARROW-17481


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] github-actions[bot] commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1221134670

   :warning: Ticket **has not been started in JIRA**, please click 'Start Progress'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955544496


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -545,11 +546,12 @@ class BlockDecodingOperator {
 /////////////////////////////////////////////////////////////////////////
 // Base class for common functionality
 
+template <typename T>

Review Comment:
   I wonder if this might be more readable as `InputType` instead of `T`.



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,

Review Comment:
   `TonyReader` has a nice ring to it but I think `StreamingFileReader` might be easier to understand for future readers.



##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -138,6 +137,42 @@ Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
   return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream, block_size));
 }
 
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(
+    std::shared_ptr<RandomAccessFile> file, int64_t block_size) {
+  struct State {
+    explicit State(std::shared_ptr<RandomAccessFile> file_, int64_t block_size_)
+        : file(std::move(file_)), block_size(block_size_), position(0) {}
+
+    Status init() {
+      RETURN_NOT_OK(file->Seek(0));
+      // if seek worked this will also work.
+      total_size = file->GetSize().ValueOrDie();
+      return Status::OK();

Review Comment:
   ```suggestion
         return file->GetSize();
   ```
   
   You are probably right but I'm not sure avoiding this if check is worth making this method slightly more confusing given it shouldn't really be called all that often.



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,
+                       public csv::StreamingReader,
+                       public std::enable_shared_from_this<TonyReaderImpl> {
+ public:
+  TonyReaderImpl(io::IOContext io_context, std::shared_ptr<io::RandomAccessFile> input,
+                 const ReadOptions& read_options, const ParseOptions& parse_options,
+                 const ConvertOptions& convert_options, bool count_rows)
+      : ReaderMixin(io_context, std::move(input), read_options, parse_options,
+                    convert_options, count_rows),
+        bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
+
+  Future<> Init(Executor* cpu_executor) {
+    ARROW_ASSIGN_OR_RAISE(
+        AsyncGenerator<std::shared_ptr<Buffer>> ifile_gen,
+        io::MakeRandomAccessFileGenerator(input_, read_options_.block_size));
+
+    // TODO Consider exposing readahead as a read option (ARROW-12090)
+    auto prefetch_gen =
+        MakeReadaheadGenerator(ifile_gen, io_context_.executor()->GetCapacity());
+
+    auto transferred_it = MakeTransferredGenerator(prefetch_gen, cpu_executor);
+
+    auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it));
+
+    int max_readahead = cpu_executor->GetCapacity();
+    auto self = shared_from_this();
+
+    return buffer_generator().Then([self, buffer_generator, max_readahead, cpu_executor](
+                                       const std::shared_ptr<Buffer>& first_buffer) {
+      return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead,
+                                        cpu_executor);
+    });
+  }

Review Comment:
   Is this the only method that differs from `StreamingReaderImpl`?  Is there any way the two streaming readers could extend some kind of `StreamingReaderBase` to avoid duplication?



##########
cpp/src/arrow/csv/reader.h:
##########
@@ -101,6 +102,10 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
       io::IOContext io_context, std::shared_ptr<io::InputStream> input,
       arrow::internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&,
       const ConvertOptions&);
+  static Future<std::shared_ptr<StreamingReader>> MakeAsync(

Review Comment:
   Let's add at least a brief comment here explaining the difference between the two reader types.



##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -19,6 +19,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <iostream>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing instead of a
+/// future
+template <typename T, typename ApplyFn,
+          typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+          typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn apply_fun,
+                                     internal::Executor* cpu_exec) {
+  struct State {
+    explicit State(AsyncGenerator<T> source_gen_, ApplyFn apply_fun_,
+                   internal::Executor* cpu_exec_)
+        : source_gen(std::move(source_gen_)),
+          apply_fun(std::move(apply_fun_)),
+          cpu_exec(cpu_exec_),
+          finished(false) {}
+
+    AsyncGenerator<T> source_gen;
+    ApplyFn apply_fun;
+    internal::Executor* cpu_exec;
+    bool finished;
+  };
+
+  auto state =
+      std::make_shared<State>(std::move(source_gen), std::move(apply_fun), cpu_exec);
+  return [state]() {
+    CallbackOptions options;
+    options.executor = state->cpu_exec;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source_gen().Then(
+        [state](const T& next) -> Result<V> {
+          if (IsIterationEnd(next)) {
+            return IterationTraits<V>::End();
+          } else {
+            auto value = state->apply_fun(next);
+            if (!value.ok()) {
+              return Status::NotImplemented("not implemented");
+            } else {
+              return value.ValueOrDie();
+            }

Review Comment:
   ```suggestion
               return state->apply_fun(next);
   ```
   I'm not entirely sure why you are rewriting the failure to "not implemented".



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <cstring>
 #include <functional>
+#include <iostream>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -138,6 +137,42 @@ Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
   return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream, block_size));
 }
 
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(
+    std::shared_ptr<RandomAccessFile> file, int64_t block_size) {
+  struct State {
+    explicit State(std::shared_ptr<RandomAccessFile> file_, int64_t block_size_)
+        : file(std::move(file_)), block_size(block_size_), position(0) {}
+
+    Status init() {

Review Comment:
   ```suggestion
       Status Init() {
   ```



##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -18,6 +18,7 @@
 #include "arrow/dataset/file_csv.h"
 
 #include <algorithm>
+#include <iostream>

Review Comment:
   ```suggestion
   ```



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -1166,10 +1313,26 @@ Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
   });
 }
 
+Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(

Review Comment:
   I'll defer to the input of others but I think an overload of `MakeStreamingReader` based solely on the type of `input` is a rather subtle API.  Could we maybe just rename this to `MakeStreamingFileReader`?



##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>> CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing instead of a
+/// future

Review Comment:
   Can you expand on this description?  I'm pretty sure the important part is that you are launching a new thread task with each input right?



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -1287,6 +1450,14 @@ Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
                              parse_options, convert_options);
 }
 
+Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
+    io::IOContext io_context, std::shared_ptr<io::RandomAccessFile> input,
+    Executor* cpu_executor, const ReadOptions& read_options,
+    const ParseOptions& parse_options, const ConvertOptions& convert_options) {
+  return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
+                             parse_options, convert_options);
+}
+

Review Comment:
   I suppose the same comment I had for `MakeStreamingReader` applies here too but `MakeFileAsync` doesn't really make sense so maybe we can leave this method name alone and just live with an overload but I'm open to suggestions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] marsupialtail commented on a diff in pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r977986717


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -887,30 +872,37 @@ class StreamingReaderImpl : public ReaderMixin,
  protected:
   Future<> InitAfterFirstBuffer(const std::shared_ptr<Buffer>& first_buffer,
                                 AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
-                                int max_readahead) {
+                                int max_readahead, Executor* cpu_executor) {
     if (first_buffer == nullptr) {
       return Status::Invalid("Empty CSV file");
     }
 
     std::shared_ptr<Buffer> after_header;
-    ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed,
-                          ProcessHeader(first_buffer, &after_header));
+    ARROW_ASSIGN_OR_RAISE(
+        auto header_bytes_consumed,
+        ReaderMixin<InputType>::ProcessHeader(first_buffer, &after_header));
     bytes_decoded_->fetch_add(header_bytes_consumed);
 
-    auto parser_op =
-        BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, num_rows_seen_);
+    auto parser_op = BlockParsingOperator(
+        ReaderMixin<InputType>::io_context_, ReaderMixin<InputType>::parse_options_,
+        ReaderMixin<InputType>::num_csv_cols_, ReaderMixin<InputType>::num_rows_seen_);

Review Comment:
   Needs the qualifiers to compile



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] westonpace commented on pull request #13931: ARROW-17481: [C++] [Python] Major performance improvements to CSV reading from S3

Posted by GitBox <gi...@apache.org>.
westonpace commented on PR #13931:
URL: https://github.com/apache/arrow/pull/13931#issuecomment-1256631455

   Can you fix the appveyor failure?  I suspect you just need to update the test case to the new approach.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org