You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2021/09/06 10:45:34 UTC
[arrow] branch master updated: ARROW-13793: [C++] Migrate
ORCFileReader to Result
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new c83db7e ARROW-13793: [C++] Migrate ORCFileReader to Result<T>
c83db7e is described below
commit c83db7e19623c5a9a17ff9ed2eab16a51e29dc46
Author: Junwang Zhao <zh...@gmail.com>
AuthorDate: Mon Sep 6 12:43:28 2021 +0200
ARROW-13793: [C++] Migrate ORCFileReader to Result<T>
Signed-off-by: Junwang Zhao <zh...@gmail.com>
Closes #11065 from zhjwpku/cpp/migrate_orcfilereader_to_result
Authored-by: Junwang Zhao <zh...@gmail.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/arrow/adapters/orc/adapter.cc | 68 ++++++++++++++++++++-
cpp/src/arrow/adapters/orc/adapter.h | 94 ++++++++++++++++++++++++++++++
cpp/src/arrow/adapters/orc/adapter_test.cc | 21 +++----
3 files changed, 169 insertions(+), 14 deletions(-)
diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc
index 2f74b40..94a3b6e 100644
--- a/cpp/src/arrow/adapters/orc/adapter.cc
+++ b/cpp/src/arrow/adapters/orc/adapter.cc
@@ -430,10 +430,14 @@ ORCFileReader::~ORCFileReader() {}
Status ORCFileReader::Open(const std::shared_ptr<io::RandomAccessFile>& file,
MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
+ return Open(file, pool).Value(reader);
+}
+
+Result<std::unique_ptr<ORCFileReader>> ORCFileReader::Open(
+ const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool) {
auto result = std::unique_ptr<ORCFileReader>(new ORCFileReader());
RETURN_NOT_OK(result->impl_->Open(file, pool));
- *reader = std::move(result);
- return Status::OK();
+ return std::move(result);
}
Result<std::shared_ptr<const KeyValueMetadata>> ORCFileReader::ReadMetadata() {
@@ -444,33 +448,79 @@ Status ORCFileReader::ReadSchema(std::shared_ptr<Schema>* out) {
return impl_->ReadSchema(out);
}
+Result<std::shared_ptr<Schema>> ORCFileReader::ReadSchema() {
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(impl_->ReadSchema(&schema));
+ return schema;
+}
+
Status ORCFileReader::Read(std::shared_ptr<Table>* out) { return impl_->Read(out); }
+Result<std::shared_ptr<Table>> ORCFileReader::Read() {
+ std::shared_ptr<Table> table;
+ RETURN_NOT_OK(impl_->Read(&table));
+ return table;
+}
+
Status ORCFileReader::Read(const std::shared_ptr<Schema>& schema,
std::shared_ptr<Table>* out) {
return impl_->Read(schema, out);
}
+Result<std::shared_ptr<Table>> ORCFileReader::Read(
+ const std::shared_ptr<Schema>& schema) {
+ std::shared_ptr<Table> table;
+ RETURN_NOT_OK(impl_->Read(schema, &table));
+ return table;
+}
+
Status ORCFileReader::Read(const std::vector<int>& include_indices,
std::shared_ptr<Table>* out) {
return impl_->Read(include_indices, out);
}
+Result<std::shared_ptr<Table>> ORCFileReader::Read(
+ const std::vector<int>& include_indices) {
+ std::shared_ptr<Table> table;
+ RETURN_NOT_OK(impl_->Read(include_indices, &table));
+ return table;
+}
+
Status ORCFileReader::Read(const std::shared_ptr<Schema>& schema,
const std::vector<int>& include_indices,
std::shared_ptr<Table>* out) {
return impl_->Read(schema, include_indices, out);
}
+Result<std::shared_ptr<Table>> ORCFileReader::Read(
+ const std::shared_ptr<Schema>& schema, const std::vector<int>& include_indices) {
+ std::shared_ptr<Table> table;
+ RETURN_NOT_OK(impl_->Read(schema, include_indices, &table));
+ return table;
+}
+
Status ORCFileReader::ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
return impl_->ReadStripe(stripe, out);
}
+Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(int64_t stripe) {
+ std::shared_ptr<RecordBatch> recordBatch;
+ RETURN_NOT_OK(impl_->ReadStripe(stripe, &recordBatch));
+ return recordBatch;
+}
+
Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
std::shared_ptr<RecordBatch>* out) {
return impl_->ReadStripe(stripe, include_indices, out);
}
+Result<std::shared_ptr<RecordBatch>> ORCFileReader::ReadStripe(
+ int64_t stripe, const std::vector<int>& include_indices) {
+ std::shared_ptr<RecordBatch> recordBatch;
+ RETURN_NOT_OK(impl_->ReadStripe(stripe, include_indices, &recordBatch));
+ return recordBatch;
+}
+
Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); }
Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
@@ -478,12 +528,26 @@ Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
return impl_->NextStripeReader(batch_sizes, out);
}
+Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
+ int64_t batch_size) {
+ std::shared_ptr<RecordBatchReader> reader;
+ RETURN_NOT_OK(impl_->NextStripeReader(batch_size, &reader));
+ return reader;
+}
+
Status ORCFileReader::NextStripeReader(int64_t batch_size,
const std::vector<int>& include_indices,
std::shared_ptr<RecordBatchReader>* out) {
return impl_->NextStripeReader(batch_size, include_indices, out);
}
+Result<std::shared_ptr<RecordBatchReader>> ORCFileReader::NextStripeReader(
+ int64_t batch_size, const std::vector<int>& include_indices) {
+ std::shared_ptr<RecordBatchReader> reader;
+ RETURN_NOT_OK(impl_->NextStripeReader(batch_size, include_indices, &reader));
+ return reader;
+}
+
int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h
index 012c170..0367951 100644
--- a/cpp/src/arrow/adapters/orc/adapter.h
+++ b/cpp/src/arrow/adapters/orc/adapter.h
@@ -27,6 +27,7 @@
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
+#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
namespace arrow {
@@ -45,9 +46,18 @@ class ARROW_EXPORT ORCFileReader {
/// \param[in] pool a MemoryPool to use for buffer allocations
/// \param[out] reader the returned reader object
/// \return Status
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
static Status Open(const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool,
std::unique_ptr<ORCFileReader>* reader);
+ /// \brief Creates a new ORC reader
+ ///
+ /// \param[in] file the data source
+ /// \param[in] pool a MemoryPool to use for buffer allocations
+ /// \return the returned reader object
+ static Result<std::unique_ptr<ORCFileReader>> Open(
+ const std::shared_ptr<io::RandomAccessFile>& file, MemoryPool* pool);
+
/// \brief Return the metadata read from the ORC file
///
/// \return A KeyValueMetadata object containing the ORC metadata
@@ -56,55 +66,114 @@ class ARROW_EXPORT ORCFileReader {
/// \brief Return the schema read from the ORC file
///
/// \param[out] out the returned Schema object
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status ReadSchema(std::shared_ptr<Schema>* out);
+ /// \brief Return the schema read from the ORC file
+ ///
+ /// \return the returned Schema object
+ Result<std::shared_ptr<Schema>> ReadSchema();
+
/// \brief Read the file as a Table
///
/// The table will be composed of one record batch per stripe.
///
/// \param[out] out the returned Table
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status Read(std::shared_ptr<Table>* out);
/// \brief Read the file as a Table
///
/// The table will be composed of one record batch per stripe.
///
+ /// \return the returned Table
+ Result<std::shared_ptr<Table>> Read();
+
+ /// \brief Read the file as a Table
+ ///
+ /// The table will be composed of one record batch per stripe.
+ ///
/// \param[in] schema the Table schema
/// \param[out] out the returned Table
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status Read(const std::shared_ptr<Schema>& schema, std::shared_ptr<Table>* out);
/// \brief Read the file as a Table
///
/// The table will be composed of one record batch per stripe.
///
+ /// \param[in] schema the Table schema
+ /// \return the returned Table
+ Result<std::shared_ptr<Table>> Read(const std::shared_ptr<Schema>& schema);
+
+ /// \brief Read the file as a Table
+ ///
+ /// The table will be composed of one record batch per stripe.
+ ///
/// \param[in] include_indices the selected field indices to read
/// \param[out] out the returned Table
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out);
/// \brief Read the file as a Table
///
/// The table will be composed of one record batch per stripe.
///
+ /// \param[in] include_indices the selected field indices to read
+ /// \return the returned Table
+ Result<std::shared_ptr<Table>> Read(const std::vector<int>& include_indices);
+
+ /// \brief Read the file as a Table
+ ///
+ /// The table will be composed of one record batch per stripe.
+ ///
/// \param[in] schema the Table schema
/// \param[in] include_indices the selected field indices to read
/// \param[out] out the returned Table
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status Read(const std::shared_ptr<Schema>& schema,
const std::vector<int>& include_indices, std::shared_ptr<Table>* out);
+ /// \brief Read the file as a Table
+ ///
+ /// The table will be composed of one record batch per stripe.
+ ///
+ /// \param[in] schema the Table schema
+ /// \param[in] include_indices the selected field indices to read
+ /// \return the returned Table
+ Result<std::shared_ptr<Table>> Read(const std::shared_ptr<Schema>& schema,
+ const std::vector<int>& include_indices);
+
/// \brief Read a single stripe as a RecordBatch
///
/// \param[in] stripe the stripe index
/// \param[out] out the returned RecordBatch
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out);
/// \brief Read a single stripe as a RecordBatch
///
/// \param[in] stripe the stripe index
+ /// \return the returned RecordBatch
+ Result<std::shared_ptr<RecordBatch>> ReadStripe(int64_t stripe);
+
+ /// \brief Read a single stripe as a RecordBatch
+ ///
+ /// \param[in] stripe the stripe index
/// \param[in] include_indices the selected field indices to read
/// \param[out] out the returned RecordBatch
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
std::shared_ptr<RecordBatch>* out);
+ /// \brief Read a single stripe as a RecordBatch
+ ///
+ /// \param[in] stripe the stripe index
+ /// \param[in] include_indices the selected field indices to read
+ /// \return the returned RecordBatch
+ Result<std::shared_ptr<RecordBatch>> ReadStripe(
+ int64_t stripe, const std::vector<int>& include_indices);
+
/// \brief Seek to designated row. Invoke NextStripeReader() after seek
/// will return stripe reader starting from designated row.
///
@@ -119,6 +188,7 @@ class ARROW_EXPORT ORCFileReader {
/// \param[in] batch_size the number of rows each record batch contains in
/// record batch iteration.
/// \param[out] out the returned stripe reader
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out);
/// \brief Get a stripe level record batch iterator with specified row count
@@ -126,14 +196,38 @@ class ARROW_EXPORT ORCFileReader {
/// alternative to ReadStripe which may cause OOM issue by loading
/// the whole stripes into memory.
///
+ /// \param[in] batch_size the number of rows each record batch contains in
+ /// record batch iteration.
+ /// \return the returned stripe reader
+ Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(int64_t batch_size);
+
+ /// \brief Get a stripe level record batch iterator with specified row count
+ /// in each record batch. NextStripeReader serves as a fine grain
+ /// alternative to ReadStripe which may cause OOM issue by loading
+ /// the whole stripes into memory.
+ ///
/// \param[in] batch_size Get a stripe level record batch iterator with specified row
/// count in each record batch.
///
/// \param[in] include_indices the selected field indices to read
/// \param[out] out the returned stripe reader
+ ARROW_DEPRECATED("Deprecated in 6.0.0. Use Result-returning overload instead.")
Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
std::shared_ptr<RecordBatchReader>* out);
+ /// \brief Get a stripe level record batch iterator with specified row count
+ /// in each record batch. NextStripeReader serves as a fine grain
+ /// alternative to ReadStripe which may cause OOM issue by loading
+ /// the whole stripes into memory.
+ ///
+ /// \param[in] batch_size Get a stripe level record batch iterator with specified row
+ /// count in each record batch.
+ ///
+ /// \param[in] include_indices the selected field indices to read
+ /// \return the returned stripe reader
+ Result<std::shared_ptr<RecordBatchReader>> NextStripeReader(
+ int64_t batch_size, const std::vector<int>& include_indices);
+
/// \brief The number of stripes in the file
int64_t NumberOfStripes();
diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc
index 9f7fb56..39c66b9 100644
--- a/cpp/src/arrow/adapters/orc/adapter_test.cc
+++ b/cpp/src/arrow/adapters/orc/adapter_test.cc
@@ -237,13 +237,12 @@ void AssertTableWriteReadEqual(const std::shared_ptr<Table>& input_table,
ARROW_EXPECT_OK(writer->Close());
EXPECT_OK_AND_ASSIGN(auto buffer, buffer_output_stream->Finish());
std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(buffer));
- std::unique_ptr<adapters::orc::ORCFileReader> reader;
- ARROW_EXPECT_OK(
- adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader));
- std::shared_ptr<Table> actual_output_table;
- ARROW_EXPECT_OK(reader->Read(&actual_output_table));
+ EXPECT_OK_AND_ASSIGN(
+ auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
+ EXPECT_OK_AND_ASSIGN(auto actual_output_table, reader->Read());
AssertTablesEqual(*expected_output_table, *actual_output_table, false, false);
}
+
void AssertArrayWriteReadEqual(const std::shared_ptr<Array>& input_array,
const std::shared_ptr<Array>& expected_output_array,
const int64_t max_size = kDefaultSmallMemStreamSize) {
@@ -323,9 +322,8 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) {
std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()),
static_cast<int64_t>(mem_stream.getLength()))));
- std::unique_ptr<adapters::orc::ORCFileReader> reader;
- ASSERT_TRUE(
- adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok());
+ ASSERT_OK_AND_ASSIGN(
+ auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
EXPECT_OK_AND_ASSIGN(auto metadata, reader->ReadMetadata());
auto expected_metadata = std::const_pointer_cast<const KeyValueMetadata>(
@@ -334,8 +332,7 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) {
ASSERT_EQ(stripe_row_count * stripe_count, reader->NumberOfRows());
ASSERT_EQ(stripe_count, reader->NumberOfStripes());
accumulated = 0;
- std::shared_ptr<RecordBatchReader> stripe_reader;
- EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+ EXPECT_OK_AND_ASSIGN(auto stripe_reader, reader->NextStripeReader(reader_batch_size));
while (stripe_reader) {
std::shared_ptr<RecordBatch> record_batch;
EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
@@ -350,14 +347,14 @@ TEST(TestAdapterRead, ReadIntAndStringFileMultipleStripes) {
}
EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
}
- EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+ EXPECT_OK_AND_ASSIGN(stripe_reader, reader->NextStripeReader(reader_batch_size));
}
// test seek operation
int64_t start_offset = 830;
EXPECT_TRUE(reader->Seek(stripe_row_count + start_offset).ok());
- EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+ EXPECT_OK_AND_ASSIGN(stripe_reader, reader->NextStripeReader(reader_batch_size));
std::shared_ptr<RecordBatch> record_batch;
EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
while (record_batch) {