You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/06/10 20:46:46 UTC

[GitHub] [arrow] n3world opened a new pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

n3world opened a new pull request #10509:
URL: https://github.com/apache/arrow/pull/10509


   Add a bytes_read() to the StreamingReader interface so the progress of the stream can be determined easily and accurately by a user.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
n3world commented on pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#issuecomment-861834800


   > Looks good, one minor nit: If there is just one property then `bytes_read` makes sense. So the API is fine. Internally there are two properties `bytes_read_` and `bytes_parsed_` which is a little confusing to me because I immediately thought `bytes_read_` meant "read but not parsed" since the order is "read->parse->decode". Maybe change `bytes_read_` to `bytes_decoded_` but leave it as `bytes_read` at the API level?
   
   I only named the variable bytes_read_ to match the method name so if you are fine with the bytes_read() returning the value of bytes_decoded_, I'll make that change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou closed pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou closed pull request #10509:
URL: https://github.com/apache/arrow/pull/10509


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#issuecomment-862202034


   This PR will make it a bit complicated to add a parallel streaming reader. @westonpace Are you ok with this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#issuecomment-861481953


   @westonpace You might be curious about this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#discussion_r652091653



##########
File path: cpp/src/arrow/csv/reader.h
##########
@@ -73,6 +73,9 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
 
   virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() = 0;
 
+  /// \brief Returns the number of bytes which have been read by this reader
+  virtual int64_t bytes_read() const = 0;

Review comment:
       A clearer docstring would be fine with me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou edited a comment on pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou edited a comment on pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#issuecomment-862202034


   This PR will make it a bit more complicated to add a parallel streaming reader. @westonpace Are you ok with this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] westonpace commented on a change in pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
westonpace commented on a change in pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#discussion_r652079281



##########
File path: cpp/src/arrow/csv/reader.h
##########
@@ -73,6 +73,9 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
 
   virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() = 0;
 
+  /// \brief Returns the number of bytes which have been read by this reader
+  virtual int64_t bytes_read() const = 0;

Review comment:
       I think `bytes_read` means "bytes the CSV reader is completely finished with".  So serial readahead (e.g. the readahead happening on the I/O context or "data read but not parsed or decoded") should not be included.  Caller consumption should be irrelevant.
   
   For parallel readahead (e.g. the CSV reader reading/parsing/decoding multiple batches of data at the same time) then my opinion is that `bytes_read` should be incremented as soon as a batch is ready to be delivered (even if there are other batches in front of it that aren't ready).
   
   Perhaps `bytes_processed` or `bytes_finished` would remove the ambiguity?  Or maybe just a clearer docstring.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#discussion_r652160176



##########
File path: cpp/src/arrow/csv/reader.h
##########
@@ -73,6 +73,9 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
 
   virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() = 0;
 
+  /// \brief Returns the number of bytes which have been read by this reader
+  virtual int64_t bytes_read() const = 0;

Review comment:
       I updated it to
   ```
     /// - bytes skipped by `ReadOptions.skip_rows` will be counted as being read before
     /// any records are returned.
     /// - bytes read while parsing the header will be counted as being read before any
     /// records are returned.
     /// - bytes skipped by `ReadOptions.skip_rows_after_names` will be counted after the
     /// first batch is returned.
     ///
     /// \return the number of bytes which have been read from the CSV stream and returned to
     /// caller
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#issuecomment-859051572


   https://issues.apache.org/jira/browse/ARROW-12996


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] n3world commented on a change in pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
n3world commented on a change in pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#discussion_r652157067



##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -216,6 +216,27 @@ TEST(StreamingReaderTests, NestedParallelism) {
   TestNestedParallelism(thread_pool, table_factory);
 }
 
+TEST(StreamingReaderTest, BytesRead) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto table_buffer =
+      std::make_shared<Buffer>("a,b,c\n123,456,789\n101,112,131\n415,161,718\n");
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+  auto read_options = ReadOptions::Defaults();
+  read_options.block_size = 20;
+  ASSERT_OK_AND_ASSIGN(
+      auto streaming_reader,
+      StreamingReader::Make(io::default_io_context(), input, read_options,
+                            ParseOptions::Defaults(), ConvertOptions::Defaults()));
+  std::shared_ptr<RecordBatch> batch;
+  int64_t bytes = 6;  // Size of header
+  do {
+    ASSERT_EQ(bytes, streaming_reader->bytes_read());
+    ASSERT_OK(streaming_reader->ReadNext(&batch));
+    bytes += 12;  // Add size of each row
+  } while (batch);
+  ASSERT_EQ(42, streaming_reader->bytes_read());
+}

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou commented on pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#issuecomment-862192671


   Calling it `bytes_read()` is fine with me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] pitrou commented on a change in pull request #10509: ARROW-12996: Add bytes_read() to StreamingReader

Posted by GitBox <gi...@apache.org>.
pitrou commented on a change in pull request #10509:
URL: https://github.com/apache/arrow/pull/10509#discussion_r651769022



##########
File path: cpp/src/arrow/csv/reader.h
##########
@@ -73,6 +73,9 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
 
   virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() = 0;
 
+  /// \brief Returns the number of bytes which have been read by this reader
+  virtual int64_t bytes_read() const = 0;

Review comment:
       The docstring may be a bit imprecise here. If there is some readahead going on, is it included in the result? Or is it the number of bytes corresponding to the batches already consumed by the caller?

##########
File path: cpp/src/arrow/csv/reader_test.cc
##########
@@ -216,6 +216,27 @@ TEST(StreamingReaderTests, NestedParallelism) {
   TestNestedParallelism(thread_pool, table_factory);
 }
 
+TEST(StreamingReaderTest, BytesRead) {
+  ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
+  auto table_buffer =
+      std::make_shared<Buffer>("a,b,c\n123,456,789\n101,112,131\n415,161,718\n");
+  auto input = std::make_shared<io::BufferReader>(table_buffer);
+  auto read_options = ReadOptions::Defaults();
+  read_options.block_size = 20;
+  ASSERT_OK_AND_ASSIGN(
+      auto streaming_reader,
+      StreamingReader::Make(io::default_io_context(), input, read_options,
+                            ParseOptions::Defaults(), ConvertOptions::Defaults()));
+  std::shared_ptr<RecordBatch> batch;
+  int64_t bytes = 6;  // Size of header
+  do {
+    ASSERT_EQ(bytes, streaming_reader->bytes_read());
+    ASSERT_OK(streaming_reader->ReadNext(&batch));
+    bytes += 12;  // Add size of each row
+  } while (batch);
+  ASSERT_EQ(42, streaming_reader->bytes_read());
+}

Review comment:
       Can you also add a test where the `skip_rows` and/or `skip_rows_after_names` options are set? What should be the semantics there?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org