You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/09/06 10:45:34 UTC

[arrow] branch master updated: ARROW-13793: [C++] Migrate ORCFileReader to Result

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c83db7e  ARROW-13793: [C++] Migrate ORCFileReader to Result<T>
c83db7e is described below

commit c83db7e19623c5a9a17ff9ed2eab16a51e29dc46
Author: Junwang Zhao <zh...@gmail.com>
AuthorDate: Mon Sep 6 12:43:28 2021 +0200

    ARROW-13793: [C++] Migrate ORCFileReader to Result<T>
    
    Signed-off-by: Junwang Zhao <zh...@gmail.com>
    
    Closes #11065 from zhjwpku/cpp/migrate_orcfilereader_to_result
    
    Authored-by: Junwang Zhao <zh...@gmail.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/arrow/adapters/orc/adapter.cc      | 68 ++++++++++++++++++++-
 cpp/src/arrow/adapters/orc/adapter.h       | 94 ++++++++++++++++++++++++++++++
 cpp/src/arrow/adapters/orc/adapter_test.cc | 21 +++----
 3 files changed, 169 insertions(+), 14 deletions(-)

diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc
index 2f74b40..94a3b6e 100644
--- a/cpp/src/arrow/adapters/orc/adapter.cc
+++ b/cpp/src/arrow/adapters/orc/adapter.cc
@@ -430,10 +430,14 @@ ORCFileReader::~ORCFileReader() {}
 
 Status ORCFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
                            MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
+  return Open(file, pool).Value(reader);
+}
+
+Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
+    const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
   auto result = std::unique_ptr<ORCFileReader>(new ORCFileReader());
   RETURN_NOT_OK(result->impl_->Open(file, pool));
-  *reader = std::move(result);
-  return Status::OK();
+  return std::move(result);
 }
 
 Result<std::shared_ptr<const KeyValueMetadata>> ORCFileReader::ReadMetadata() {
@@ -444,33 +448,79 @@ Status ORCFileReader::ReadSchema(std::shared_ptr<Schema>* out) {
   return impl_->ReadSchema(out);
 }
 
+Result<std::shared_ptr<Schema>> ORCFileReader::ReadSchema() {
+  std::shared_ptr<Schema> schema;
+  RETURN_NOT_OK(impl_->ReadSchema(&schema));
+  return schema;
+}
+
 Status ORCFileReader::Read(std::shared_ptr<Table>* out) { return impl_->Read(out); }
 
+Result<std::shared_ptr<Table>> ORCFileReader::Read() {
+  std::shared_ptr<Table> table;
+  RETURN_NOT_OK(impl_->Read(&table));
+  return table;
+}
+
 Status ORCFileReader::Read(const std::shared_ptr<Schema>& schema,
                            std::shared_ptr<Table>* out) {
   return impl_->Read(schema, out);
 }
 
+Result<std::shared_ptr<Table>> ORCFileReader::Read(
+    const std::shared_ptr<Schema>& schema) {
+  std::shared_ptr<Table> table;
+  RETURN_NOT_OK(impl_->Read(schema, &table));
+  return table;
+}
+
 Status ORCFileReader::Read(const std::vector<int>& include_indices,
                            std::shared_ptr<Table>* out) {
   return impl_->Read(include_indices, out);
 }
 
+Result<std::shared_ptr<Table>> ORCFileReader::Read(
+    const std::vector<int>& include_indices) {
+  std::shared_ptr<Table> table;
+  RETURN_NOT_OK(impl_->Read(include_indices, &table));
+  return table;
+}
+
 Status ORCFileReader::Read(const std::shared_ptr<Schema>& schema,
                            const std::vector<int>& include_indices,
                            std::shared_ptr<Table>* out) {
   return impl_->Read(schema, include_indices, out);
 }
 
+Result<std::shared_ptr<Table>> ORCFileReader::Read(
+    const std::shared_ptr<Schema>& schema, const std::vector<int>& include_indices) {
+  std::shared_ptr<Table> table;
+  RETURN_NOT_OK(impl_->Read(schema, include_indices, &table));
+  return table;
+}
+
 Status ORCFileReader::ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
   return impl_->ReadStripe(stripe, out);
 }
 
+Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(int64_t stripe) {
+  std::shared_ptr<RecordBatch> recordBatch;
+  RETURN_NOT_OK(impl_->ReadStripe(stripe, &recordBatch));
+  return recordBatch;
+}
+
 Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
                                  std::shared_ptr<RecordBatch>* out) {
   return impl_->ReadStripe(stripe, include_indices, out);
 }
 
+Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
+    int64_t stripe, const std::vector<int>& include_indices) {
+  std::shared_ptr<RecordBatch> recordBatch;
+  RETURN_NOT_OK(impl_->ReadStripe(stripe, include_indices, &recordBatch));
+  return recordBatch;
+}
+
 Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); }
 
 Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
@@ -478,12 +528,26 @@ Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
   return impl_->NextStripeReader(batch_sizes, out);
 }
 
+Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
+    int64_t batch_size) {
+  std::shared_ptr<RecordBatchReader> reader;
+  RETURN_NOT_OK(impl_->NextStripeReader(batch_size, &reader));
+  return reader;
+}
+
 Status ORCFileReader::NextStripeReader(int64_t batch_size,
                                        const std::vector<int>& include_indices,
                                        std::shared_ptr<RecordBatchReader>* out) {
   return impl_->NextStripeReader(batch_size, include_indices, out);
 }
 
+Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
+    int64_t batch_size, const std::vector<int>& include_indices) {
+  std::shared_ptr<RecordBatchReader> reader;
+  RETURN_NOT_OK(impl_->NextStripeReader(batch_size, include_indices, &reader));
+  return reader;
+}
+
 int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h
index 012c170..0367951 100644
--- a/cpp/src/arrow/adapters/orc/adapter.h
+++ b/cpp/src/arrow/adapters/orc/adapter.h
@@ -27,6 +27,7 @@
 #include "arrow/status.h"
 #include "arrow/type.h"
 #include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -45,9 +46,18 @@ class ARROW_EXPORT ORCFileReader {
   /// \param[in] pool a MemoryPool to use for buffer allocations
   /// \param[out] reader the returned reader object
   /// \return Status
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool,
                      std::unique_ptr<ORCFileReader>* reader);
 
+  /// \brief Creates a new ORC reader
+  ///
+  /// \param[in] file the data source
+  /// \param[in] pool a MemoryPool to use for buffer allocations
+  /// \return the returned reader object
+  static Result<std::unique_ptr<ORCFileReader>> Open(
+      const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool);
+
   /// \brief Return the metadata read from the ORC file
   ///
   /// \return A KeyValueMetadata object containing the ORC metadata
@@ -56,55 +66,114 @@ class ARROW_EXPORT ORCFileReader {
   /// \brief Return the schema read from the ORC file
   ///
   /// \param[out] out the returned Schema object
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status ReadSchema(std::shared_ptr<Schema>* out);
 
+  /// \brief Return the schema read from the ORC file
+  ///
+  /// \return the returned Schema object
+  Result<std::shared_ptr<Schema>> ReadSchema();
+
   /// \brief Read the file as a Table
   ///
   /// The table will be composed of one record batch per stripe.
   ///
   /// \param[out] out the returned Table
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status Read(std::shared_ptr<Table>* out);
 
   /// \brief Read the file as a Table
   ///
   /// The table will be composed of one record batch per stripe.
   ///
+  /// \return the returned Table
+  Result<std::shared_ptr<Table>> Read();
+
+  /// \brief Read the file as a Table
+  ///
+  /// The table will be composed of one record batch per stripe.
+  ///
   /// \param[in] schema the Table schema
   /// \param[out] out the returned Table
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out);
 
   /// \brief Read the file as a Table
   ///
   /// The table will be composed of one record batch per stripe.
   ///
+  /// \param[in] schema the Table schema
+  /// \return the returned Table
+  Result<std::shared_ptr<Table>> Read(const std::shared_ptr<Schema>& schema);
+
+  /// \brief Read the file as a Table
+  ///
+  /// The table will be composed of one record batch per stripe.
+  ///
   /// \param[in] include_indices the selected field indices to read
   /// \param[out] out the returned Table
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out);
 
   /// \brief Read the file as a Table
   ///
   /// The table will be composed of one record batch per stripe.
   ///
+  /// \param[in] include_indices the selected field indices to read
+  /// \return the returned Table
+  Result<std::shared_ptr<Table>> Read(const std::vector<int>& include_indices);
+
+  /// \brief Read the file as a Table
+  ///
+  /// The table will be composed of one record batch per stripe.
+  ///
   /// \param[in] schema the Table schema
   /// \param[in] include_indices the selected field indices to read
   /// \param[out] out the returned Table
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status Read(const std::shared_ptr<Schema>& schema,
               const std::vector<int>& include_indices, std::shared_ptr<Table>* out);
 
+  /// \brief Read the file as a Table
+  ///
+  /// The table will be composed of one record batch per stripe.
+  ///
+  /// \param[in] schema the Table schema
+  /// \param[in] include_indices the selected field indices to read
+  /// \return the returned Table
+  Result<std::shared_ptr<Table>> Read(const std::shared_ptr<Schema>& schema,
+                                      const std::vector<int>& include_indices);
+
   /// \brief Read a single stripe as a RecordBatch
   ///
   /// \param[in] stripe the stripe index
   /// \param[out] out the returned RecordBatch
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out);
 
   /// \brief Read a single stripe as a RecordBatch
   ///
   /// \param[in] stripe the stripe index
+  /// \return the returned RecordBatch
+  Result<std::shared_ptr<RecordBatch>> ReadStripe(int64_t stripe);
+
+  /// \brief Read a single stripe as a RecordBatch
+  ///
+  /// \param[in] stripe the stripe index
   /// \param[in] include_indices the selected field indices to read
   /// \param[out] out the returned RecordBatch
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
                     std::shared_ptr<RecordBatch>* out);
 
+  /// \brief Read a single stripe as a RecordBatch
+  ///
+  /// \param[in] stripe the stripe index
+  /// \param[in] include_indices the selected field indices to read
+  /// \return the returned RecordBatch
+  Result<std::shared_ptr<RecordBatch>> ReadStripe(
+      int64_t stripe, const std::vector<int>& include_indices);
+
   /// \brief Seek to designated row. Invoke NextStripeReader() after seek
   ///        will return stripe reader starting from designated row.
   ///
@@ -119,6 +188,7 @@ class ARROW_EXPORT ORCFileReader {
   /// \param[in] batch_size the number of rows each record batch contains in
   ///            record batch iteration.
   /// \param[out] out the returned stripe reader
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out);
 
   /// \brief Get a stripe level record batch iterator with specified row count
@@ -126,14 +196,38 @@ class ARROW_EXPORT ORCFileReader {
   ///         alternative to ReadStripe which may cause OOM issue by loading
   ///         the whole stripes into memory.
   ///
+  /// \param[in] batch_size the number of rows each record batch contains in
+  ///            record batch iteration.
+  /// \return the returned stripe reader
+  Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(int64_t batch_size);
+
+  /// \brief Get a stripe level record batch iterator with specified row count
+  ///         in each record batch. NextStripeReader serves as a fine grain
+  ///         alternative to ReadStripe which may cause OOM issue by loading
+  ///         the whole stripes into memory.
+  ///
   /// \param[in] batch_size Get a stripe level record batch iterator with specified row
   /// count in each record batch.
   ///
   /// \param[in] include_indices the selected field indices to read
   /// \param[out] out the returned stripe reader
+  ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
   Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
                           std::shared_ptr<RecordBatchReader>* out);
 
+  /// \brief Get a stripe level record batch iterator with specified row count
+  ///         in each record batch. NextStripeReader serves as a fine grain
+  ///         alternative to ReadStripe which may cause OOM issue by loading
+  ///         the whole stripes into memory.
+  ///
+  /// \param[in] batch_size Get a stripe level record batch iterator with specified row
+  /// count in each record batch.
+  ///
+  /// \param[in] include_indices the selected field indices to read
+  /// \return the returned stripe reader
+  Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
+      int64_t batch_size, const std::vector<int>& include_indices);
+
   /// \brief The number of stripes in the file
   int64_t NumberOfStripes();
 
diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc
index 9f7fb56..39c66b9 100644
--- a/cpp/src/arrow/adapters/orc/adapter_test.cc
+++ b/cpp/src/arrow/adapters/orc/adapter_test.cc
@@ -237,13 +237,12 @@ void AssertTableWriteReadEqual(const std::shared_ptr<Table>& input_table,
   ARROW_EXPECT_OK(writer->Close());
   EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish());
   std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(buffer));
-  std::unique_ptr<adapters::orc::ORCFileReader> reader;
-  ARROW_EXPECT_OK(
-      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader));
-  std::shared_ptr<Table> actual_output_table;
-  ARROW_EXPECT_OK(reader->Read(&actual_output_table));
+  EXPECT_OK_AND_ASSIGN(
+      auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
+  EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read());
   AssertTablesEqual(*expected_output_table, *actual_output_table, false, false);
 }
+
 void AssertArrayWriteReadEqual(const std::shared_ptr<Array>& input_array,
                                const std::shared_ptr<Array>& expected_output_array,
                                const int64_t max_size = kDefaultSmallMemStreamSize) {
@@ -323,9 +322,8 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) {
       std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()),
                                static_cast<int64_t>(mem_stream.getLength()))));
 
-  std::unique_ptr<adapters::orc::ORCFileReader> reader;
-  ASSERT_TRUE(
-      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok());
+  ASSERT_OK_AND_ASSIGN(
+      auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
 
   EXPECT_OK_AND_ASSIGN(auto metadata, reader->ReadMetadata());
   auto expected_metadata = std::const_pointer_cast<const KeyValueMetadata>(
@@ -334,8 +332,7 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) {
   ASSERT_EQ(stripe_row_count * stripe_count, reader->NumberOfRows());
   ASSERT_EQ(stripe_count, reader->NumberOfStripes());
   accumulated = 0;
-  std::shared_ptr<RecordBatchReader> stripe_reader;
-  EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+  EXPECT_OK_AND_ASSIGN(auto stripe_reader, reader->NextStripeReader(reader_batch_size));
   while (stripe_reader) {
     std::shared_ptr<RecordBatch> record_batch;
     EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
@@ -350,14 +347,14 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) {
       }
       EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
     }
-    EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+    EXPECT_OK_AND_ASSIGN(stripe_reader, reader->NextStripeReader(reader_batch_size));
   }
 
   // test seek operation
   int64_t start_offset = 830;
   EXPECT_TRUE(reader->Seek(stripe_row_count + start_offset).ok());
 
-  EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+  EXPECT_OK_AND_ASSIGN(stripe_reader, reader->NextStripeReader(reader_batch_size));
   std::shared_ptr<RecordBatch> record_batch;
   EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
   while (record_batch) {