You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "kou (via GitHub)" <gi...@apache.org> on 2023/06/28 06:01:01 UTC

[GitHub] [arrow] kou opened a new pull request, #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

kou opened a new pull request, #36344:
URL: https://github.com/apache/arrow/pull/36344

   ### Rationale for this change
   
   Because they (pull-based and push-based) must have the same behavior.
   
   ### What changes are included in this PR?
   
   This PR extracts reusable codes to StreamDecoderInternal from StreamDecoderImpl. External API isn't changed for
   RecordBatchStreamReader and StreamDecoder.
   
   This PR adds some external API to implement this:
   
   * arrow::Status::ToStringWithoutContextLines(): This is only for testing. We can get stable result of ASSERT_RAISES_WITH_MESSAGE() with/without -DARROW_EXTRA_ERROR_CONTEXT=ON by this.
   
     We can extract this and related changes to separated PR if we want.
   
   * arrow::ipc::Listener::OnRecordBatchWithMetadataDecoded(): Because RecordBatchStreamReader wants not only RecordBatch but also custom metadata. OnRecordBatchWithMetadataDecoded() receives RecordBatchWithMetadata. OnRecordBatchDecoded() still exists and it's used by default for backward compatibility.
   
   * arrow::ipc::CollectListener::metadatas(), arrow::ipc::CollectListener::num_record_batches(), arrow::ipc::CollectListener::PopRecordBatch(), arrow::ipc::CollectListener::PopRecordBatchWithMetadat(): If we add these APIs, we can use CollectListner in RecordBatchStreamReader. We can create an internal listener only for RecordBatchStreamReader if don't want to extend CollectListener.
   
   ### Are these changes tested?
   
   Yes.
   
   ### Are there any user-facing changes?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1614010883

   > It seems like Homebrew is broken currently?
   
   I've fixed it by #36403 and rebased on main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1614058881

   AppVeyor failure: #36405


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou merged pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou merged PR #36344:
URL: https://github.com/apache/arrow/pull/36344


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] thisisnic commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "thisisnic (via GitHub)" <gi...@apache.org>.
thisisnic commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1614238135

   R failures are unrelated; the fix is in #36397 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1613279888

   It seems like Homebrew is broken currently? @kou


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1617441227

   #36397 was merged and I've rebased on main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1612480766

   The integration test failures are fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1244724170


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +323,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }

Review Comment:
   We may want to use better name for a vector of metadata...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1246824754


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   Ahh... Isn't it error-prone that `schema()` returns the filtered schema (`out_schema_`) but `OnSchemaDecoded` is called with the unfiltered schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1614238784

   I think that #36381 introduced it and it's tracked by #36396.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1614233145

   The R failures look unexpected. @paleolimbot @thisisnic Any clue about them?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] conbench-apache-arrow[bot] commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "conbench-apache-arrow[bot] (via GitHub)" <gi...@apache.org>.
conbench-apache-arrow[bot] commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1626411961

   Conbench analyzed the 6 benchmark runs on commit `7ebc88c8`.
   
   There were 3 benchmark results indicating a performance regression:
   
   - Commit Run on `arm64-m6g-linux-compute` at [2023-07-04 02:18:04Z](http://conbench.ursa.dev/compare/runs/72e6754210ee48378b7ace13ec0e6a55...8fbeb0cab5174a05af8337c88b0cf371/)
     - [params=4096, source=cpp-micro, suite=parquet-encoding-benchmark](http://conbench.ursa.dev/compare/benchmarks/064a37617dda73c28000d89ba82b9442...064a38217dfc7dcb8000353462f15253)
     - [source=cpp-micro, suite=parquet-bloom-filter-benchmark](http://conbench.ursa.dev/compare/benchmarks/064a3761b2c27aa68000217525b60f60...064a3820be427e11800048b9a20ab5f6)
   - and 1 more (see the report linked below)
   
   The [full Conbench report](https://github.com/apache/arrow/runs/14874203434) has more details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1246016604


##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }
+
+  /// \return the last decoded record batch and remove it from
+  /// record_batches
+  std::shared_ptr<RecordBatch> PopRecordBatch() {
+    auto record_batch_with_metadata = PopRecordBatchWithMetadata();
+    return std::move(record_batch_with_metadata.batch);
+  }
+
+  /// \return the last decoded record batch with custom metadata and
+  /// remove it from record_batches
+  RecordBatchWithMetadata PopRecordBatchWithMetadata() {
+    RecordBatchWithMetadata record_batch_with_metadata;
+    if (record_batches_.empty()) {
+      return record_batch_with_metadata;
+    }
+    record_batch_with_metadata.batch =
+        std::move(record_batches_[record_batches_.size() - 1]);
+    record_batch_with_metadata.custom_metadata =
+        std::move(metadatas_[metadatas_.size() - 1]);

Review Comment:
   Thanks! It's what I want!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1612396697

   The integration test failures seems related:
   
   https://github.com/apache/arrow/actions/runs/5407939797/jobs/9826488521?pr=36344#step:7:14040
   
   ```text
   ################# FAILURES #################
   FAILED TEST: nested_dictionary Rust producing,  C++ consuming
   
   FAILED TEST: shared_dict C++ producing,  C++ consuming
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1247489251


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   OK. I've opened #36407 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1247364858


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   I can understand your concern.
   
   How about adding `Listener::OnSchemaDecoded(std::shared_ptr<Schema> schema, std::shared_ptr<Schema> filtered_schema)` and it calls `Listener::OnSchemaDecoded(std::shared_ptr<Schema> schema)` by default for keeping backward compatibility? (Is `filtered_schema` name good? I think that `out_schema` is difficult to understand.)
   
   If we want to do it, I want to do it in a separated pull request instead of including it to this pull request. Because it's not related to this pull request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1247480486


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   That's a good idea, thanks for suggesting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on PR #36344:
URL: https://github.com/apache/arrow/pull/36344#issuecomment-1619296927

   The R failures have gone.
   I'll merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1246824754


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   Ahh... Isn't it error-prone that `schema()` returns the filtered schema (`out_schema_`) but `OnSchemaDecoded` is called with the unfiltered schema (`schema_`)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] kou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "kou (via GitHub)" <gi...@apache.org>.
kou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1246022396


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());

Review Comment:
   Good catch!



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }

Review Comment:
   Ah, I just used the same signature as `record_batches()` but we should use const-ref.
   I can't remember why I didn't use const-ref for `record_batches()` but I just doesn't notice it...
   
   I also use const-ref for `record_batches()` too. It breaks a backward compatibility but I hope that nobody doesn't use this.



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());
+    while (collect_listener->num_record_batches() == 0) {

Review Comment:
   Ah, it's better.



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }

Review Comment:
   Ah, yes. Other places use `int64_t` or `int` for the number of record batches.



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -251,7 +251,8 @@ class ARROW_EXPORT Listener {
   /// \see StreamDecoder
   virtual Status OnEOS();
 
-  /// \brief Called when a record batch is decoded.
+  /// \brief Called when a record batch is decoded and
+  /// OnReocrdBatchDecoded() isn't overrided.

Review Comment:
   Oh...



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   https://github.com/apache/arrow/pull/36344/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR940 and https://github.com/apache/arrow/pull/36344/files#diff-e992169684aea9845ac776ada4cbb2b5dc711b49e5a3fbc6046c92299e1aefceR954 .
   (I understand that this PR's diff is difficult to review...)
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow] pitrou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

Posted by "pitrou (via GitHub)" <gi...@apache.org>.
pitrou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1245006878


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());

Review Comment:
   `checked_cast` here?



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());
+    while (collect_listener->num_record_batches() == 0) {

Review Comment:
   Should we also check that `state() != EOS`?



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -251,7 +251,8 @@ class ARROW_EXPORT Listener {
   /// \see StreamDecoder
   virtual Status OnEOS();
 
-  /// \brief Called when a record batch is decoded.
+  /// \brief Called when a record batch is decoded and
+  /// OnReocrdBatchDecoded() isn't overrided.

Review Comment:
   ```suggestion
     /// \brief Called when a record batch is decoded and
     /// OnRecordBatchWithMetadataDecoded() isn't overrided.
   ```



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +323,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }

Review Comment:
   I think this is ok.



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   Is `schema_` used anywhere?



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }
+
+  /// \return the last decoded record batch and remove it from
+  /// record_batches
+  std::shared_ptr<RecordBatch> PopRecordBatch() {
+    auto record_batch_with_metadata = PopRecordBatchWithMetadata();
+    return std::move(record_batch_with_metadata.batch);
+  }
+
+  /// \return the last decoded record batch with custom metadata and
+  /// remove it from record_batches
+  RecordBatchWithMetadata PopRecordBatchWithMetadata() {
+    RecordBatchWithMetadata record_batch_with_metadata;
+    if (record_batches_.empty()) {
+      return record_batch_with_metadata;
+    }
+    record_batch_with_metadata.batch =
+        std::move(record_batches_[record_batches_.size() - 1]);
+    record_batch_with_metadata.custom_metadata =
+        std::move(metadatas_[metadatas_.size() - 1]);

Review Comment:
   ```suggestion
       record_batch_with_metadata.batch = std::move(record_batches_.back());
       record_batch_with_metadata.custom_metadata = std::move(metadatas_.back());
   ```



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }

Review Comment:
   But why is this returning a copy? It could return a const-ref. Same for `record_batches()`...
   



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }

Review Comment:
   Return `int64_t` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org