You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "pitrou (via GitHub)" <gi...@apache.org> on 2023/06/28 10:38:26 UTC

[GitHub] [arrow] pitrou commented on a diff in pull request #36344: GH-26153: [C++] Share common codes for RecordBatchStreamReader and StreamDecoder

pitrou commented on code in PR #36344:
URL: https://github.com/apache/arrow/pull/36344#discussion_r1245006878


##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());

Review Comment:
   `checked_cast` here?



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Schema> out_schema_;
+  ReadStats stats_;
+  bool swap_endian_;
+};
 
-      if (message->type() != MessageType::DICTIONARY_BATCH) {
-        return Status::Invalid("IPC stream did not have the expected number (", num_dicts,
-                               ") of dictionaries at the start of the stream");
-      }
-      RETURN_NOT_OK(ReadDictionary(*message));
+// ----------------------------------------------------------------------
+// RecordBatchStreamReader implementation
+
+class RecordBatchStreamReaderImpl : public RecordBatchStreamReader,
+                                    public StreamDecoderInternal {
+ public:
+  RecordBatchStreamReaderImpl(std::unique_ptr<MessageReader> message_reader,
+                              const IpcReadOptions& options)
+      : RecordBatchStreamReader(),
+        StreamDecoderInternal(std::make_shared<CollectListener>(), options),
+        message_reader_(std::move(message_reader)) {}
+
+  Status Init() {
+    // Read schema
+    ARROW_ASSIGN_OR_RAISE(auto message, message_reader_->ReadNextMessage());
+    if (!message) {
+      return Status::Invalid("Tried reading schema message, was null or length 0");
     }
+    return OnMessageDecoded(std::move(message));
+  }
 
-    have_read_initial_dictionaries_ = true;
+  Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
+    ARROW_ASSIGN_OR_RAISE(auto batch_with_metadata, ReadNext());
+    *batch = std::move(batch_with_metadata.batch);
     return Status::OK();
   }
 
-  std::unique_ptr<MessageReader> message_reader_;
-  IpcReadOptions options_;
-  std::vector<bool> field_inclusion_mask_;
-
-  bool have_read_initial_dictionaries_ = false;
-
-  // Flag to set in case where we fail to observe all dictionaries in a stream,
-  // and so the reader should not attempt to parse any messages
-  bool empty_stream_ = false;
+  Result<RecordBatchWithMetadata> ReadNext() override {
+    auto collect_listener = static_cast<CollectListener*>(raw_listener());
+    while (collect_listener->num_record_batches() == 0) {

Review Comment:
   Should we also check that `state() != EOS`?



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -251,7 +251,8 @@ class ARROW_EXPORT Listener {
   /// \see StreamDecoder
   virtual Status OnEOS();
 
-  /// \brief Called when a record batch is decoded.
+  /// \brief Called when a record batch is decoded and
+  /// OnReocrdBatchDecoded() isn't overrided.

Review Comment:
   ```suggestion
     /// \brief Called when a record batch is decoded and
     /// OnRecordBatchWithMetadataDecoded() isn't overrided.
   ```



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +323,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }

Review Comment:
   I think this is ok.



##########
cpp/src/arrow/ipc/reader.cc:
##########
@@ -951,60 +977,85 @@ class RecordBatchStreamReaderImpl : public RecordBatchStreamReader {
     return Status::OK();
   }
 
-  Status ReadInitialDictionaries() {
-    // We must receive all dictionaries before reconstructing the
-    // first record batch. Subsequent dictionary deltas modify the memo
-    std::unique_ptr<Message> message;
-
-    // TODO(wesm): In future, we may want to reconcile the ids in the stream with
-    // those found in the schema
-    const auto num_dicts = dictionary_memo_.fields().num_dicts();
-    for (int i = 0; i < num_dicts; ++i) {
-      ARROW_ASSIGN_OR_RAISE(message, ReadNextMessage());
-      if (!message) {
-        if (i == 0) {
-          /// ARROW-6006: If we fail to find any dictionaries in the stream, then
-          /// it may be that the stream has a schema but no actual data. In such
-          /// case we communicate that we were unable to find the dictionaries
-          /// (but there was no failure otherwise), so the caller can decide what
-          /// to do
-          empty_stream_ = true;
-          break;
-        } else {
-          // ARROW-6126, the stream terminated before receiving the expected
-          // number of dictionaries
-          return Status::Invalid("IPC stream ended without reading the expected number (",
-                                 num_dicts, ") of dictionaries");
-        }
-      }
+  std::shared_ptr<Listener> listener_;
+  const IpcReadOptions options_;
+  State state_;
+  std::vector<bool> field_inclusion_mask_;
+  int num_required_initial_dictionaries_;
+  int num_read_initial_dictionaries_;
+  DictionaryMemo dictionary_memo_;
+  std::shared_ptr<Schema> schema_;

Review Comment:
   Is `schema_` used anywhere?



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }
+
+  /// \return the last decoded record batch and remove it from
+  /// record_batches
+  std::shared_ptr<RecordBatch> PopRecordBatch() {
+    auto record_batch_with_metadata = PopRecordBatchWithMetadata();
+    return std::move(record_batch_with_metadata.batch);
+  }
+
+  /// \return the last decoded record batch with custom metadata and
+  /// remove it from record_batches
+  RecordBatchWithMetadata PopRecordBatchWithMetadata() {
+    RecordBatchWithMetadata record_batch_with_metadata;
+    if (record_batches_.empty()) {
+      return record_batch_with_metadata;
+    }
+    record_batch_with_metadata.batch =
+        std::move(record_batches_[record_batches_.size() - 1]);
+    record_batch_with_metadata.custom_metadata =
+        std::move(metadatas_[metadatas_.size() - 1]);

Review Comment:
   ```suggestion
       record_batch_with_metadata.batch = std::move(record_batches_.back());
       record_batch_with_metadata.custom_metadata = std::move(metadatas_.back());
   ```



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }

Review Comment:
   But why is this returning a copy? It could return a const-ref. Same for `record_batches()`...
   



##########
cpp/src/arrow/ipc/reader.h:
##########
@@ -301,9 +317,39 @@ class ARROW_EXPORT CollectListener : public Listener {
     return record_batches_;
   }
 
+  /// \return the all decoded metadatas
+  std::vector<std::shared_ptr<KeyValueMetadata>> metadatas() const { return metadatas_; }
+
+  /// \return the number of collected record batches
+  size_t num_record_batches() const { return record_batches_.size(); }

Review Comment:
   Return `int64_t` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org