You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/03/25 04:22:23 UTC
[arrow] branch master updated: ARROW-4772: [C++] new ORC adapter
interface for stripe and row iteration
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3129732 ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration
3129732 is described below
commit 3129732a18210d0c8921b45f79be4f34eadf0cc3
Author: Yurui Zhou <yu...@alibaba-inc.com>
AuthorDate: Sun Mar 24 21:21:50 2019 -0700
ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration
Improvemnt of current ORC adapter interface that enable following operation:
- enable seek operation to designated row
- enable iteration over stripe with StripeReader
- StripeReader is neccesary since ORC support stripe level dictionary
encoding, for this reason the Arrow Schema could varies between
stripes if Dictionary Based Type is enabled.
- enable row level iteration with StripeReader
Author: Yurui Zhou <yu...@alibaba-inc.com>
Closes #3843 from yuruiz/OrcAdapterInterface and squashes the following commits:
9c3229b5 <Yurui Zhou> resolve comments
e3911374 <Yurui Zhou> resolve comments
74ea0727 <Yurui Zhou> fix clang format error
94d34827 <Yurui Zhou> resolve comments
0a525dcb <Yurui Zhou> Resolve comments
b241b64d <Yurui Zhou> fix lint error
bf12efed <Yurui Zhou> remove unnecessary deprecation mark
340f4f96 <Yurui Zhou> resolve code style issues
1f5ecdce <Yurui Zhou> fix clang format error
0547348f <Yurui Zhou> Fix cpplint errors
1d258540 <Yurui Zhou> Fix cmake format error
eb187c0e <Yurui Zhou> ARROW-4713: new ORC adapter interface for stripe and row iteration Improvemnt of current ORC adapter interface that enable following operation: - enable seek operation to designated row - enable iteration over stripe with StripeReader - StripeReader is neccesary since ORC support stripe level dictionary encoding, for this reason the Arrow Schema could varies between stripes if Dictionary Based Type is enabled. - enable row level iteration with StripeReader
---
cpp/src/arrow/CMakeLists.txt | 2 +-
cpp/src/arrow/adapters/orc/CMakeLists.txt | 19 ++
cpp/src/arrow/adapters/orc/adapter-test.cc | 158 ++++++++++
cpp/src/arrow/adapters/orc/adapter.cc | 485 +++++++----------------------
cpp/src/arrow/adapters/orc/adapter.h | 31 +-
cpp/src/arrow/adapters/orc/adapter_util.cc | 427 +++++++++++++++++++++++++
cpp/src/arrow/adapters/orc/adapter_util.h | 44 +++
7 files changed, 792 insertions(+), 374 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 865c453..3ff8ba1 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -202,7 +202,7 @@ endif()
if(ARROW_ORC)
add_subdirectory(adapters/orc)
- set(ARROW_SRCS adapters/orc/adapter.cc ${ARROW_SRCS})
+ set(ARROW_SRCS adapters/orc/adapter.cc adapters/orc/adapter_util.cc ${ARROW_SRCS})
endif()
if(ARROW_TENSORFLOW)
diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt
index 6c8b47e..97bff89 100644
--- a/cpp/src/arrow/adapters/orc/CMakeLists.txt
+++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt
@@ -26,3 +26,22 @@ install(FILES adapter.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/
configure_file(arrow-orc.pc.in "${CMAKE_CURRENT_BINARY_DIR}/arrow-orc.pc" @ONLY)
install(FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-orc.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
+
+set(ORC_MIN_TEST_LIBS GTest::Main GTest::GTest)
+
+if(ARROW_BUILD_STATIC)
+ set(ARROW_LIBRARIES_FOR_STATIC_TESTS arrow_testing_static arrow_static)
+else()
+ set(ARROW_LIBRARIES_FOR_STATIC_TESTS arrow_testing_shared arrow_shared)
+endif()
+
+if(APPLE)
+ set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} ${CMAKE_DL_LIBS})
+elseif(NOT MSVC)
+ set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} pthread ${CMAKE_DL_LIBS})
+endif()
+
+set(ORC_STATIC_TEST_LINK_LIBS ${ORC_MIN_TEST_LIBS} ${ARROW_LIBRARIES_FOR_STATIC_TESTS}
+ orc_static)
+
+add_arrow_test(adapter-test PREFIX "orc" STATIC_LINK_LIBS ${ORC_STATIC_TEST_LINK_LIBS})
diff --git a/cpp/src/arrow/adapters/orc/adapter-test.cc b/cpp/src/arrow/adapters/orc/adapter-test.cc
new file mode 100644
index 0000000..f42144e
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter-test.cc
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/adapters/orc/adapter.h"
+#include "arrow/array.h"
+#include "arrow/io/api.h"
+
+#include <gtest/gtest.h>
+#include <orc/OrcFile.hh>
+
+namespace liborc = orc;
+
+namespace arrow {
+
+constexpr int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024;
+
+class MemoryOutputStream : public liborc::OutputStream {
+ public:
+ explicit MemoryOutputStream(ssize_t capacity)
+ : data_(capacity), name_("MemoryOutputStream"), length_(0) {}
+
+ uint64_t getLength() const override { return length_; }
+
+ uint64_t getNaturalWriteSize() const override { return natural_write_size_; }
+
+ void write(const void* buf, size_t size) override {
+ memcpy(data_.data() + length_, buf, size);
+ length_ += size;
+ }
+
+ const std::string& getName() const override { return name_; }
+
+ const char* getData() const { return data_.data(); }
+
+ void close() override {}
+
+ void reset() { length_ = 0; }
+
+ private:
+ std::vector<char> data_;
+ std::string name_;
+ uint64_t length_, natural_write_size_;
+};
+
+std::unique_ptr<liborc::Writer> CreateWriter(uint64_t stripe_size,
+ const liborc::Type& type,
+ liborc::OutputStream* stream) {
+ liborc::WriterOptions options;
+ options.setStripeSize(stripe_size);
+ options.setCompressionBlockSize(1024);
+ options.setMemoryPool(liborc::getDefaultPool());
+ options.setRowIndexStride(0);
+ return liborc::createWriter(type, stream, options);
+}
+
+TEST(TestAdapter, readIntAndStringFileMultipleStripes) {
+ MemoryOutputStream mem_stream(DEFAULT_MEM_STREAM_SIZE);
+ ORC_UNIQUE_PTR<liborc::Type> type(
+ liborc::Type::buildTypeFromString("struct<col1:int,col2:string>"));
+
+ constexpr uint64_t stripe_size = 1024; // 1K
+ constexpr uint64_t stripe_count = 10;
+ constexpr uint64_t stripe_row_count = 65535;
+ constexpr uint64_t reader_batch_size = 1024;
+
+ auto writer = CreateWriter(stripe_size, *type, &mem_stream);
+ auto batch = writer->createRowBatch(stripe_row_count);
+ auto struct_batch = dynamic_cast<liborc::StructVectorBatch*>(batch.get());
+ auto long_batch = dynamic_cast<liborc::LongVectorBatch*>(struct_batch->fields[0]);
+ auto str_batch = dynamic_cast<liborc::StringVectorBatch*>(struct_batch->fields[1]);
+ int64_t accumulated = 0;
+
+ for (uint64_t j = 0; j < stripe_count; ++j) {
+ char data_buffer[327675];
+ uint64_t offset = 0;
+ for (uint64_t i = 0; i < stripe_row_count; ++i) {
+ std::string str_data = std::to_string(accumulated % stripe_row_count);
+ long_batch->data[i] = static_cast<int64_t>(accumulated % stripe_row_count);
+ str_batch->data[i] = data_buffer + offset;
+ str_batch->length[i] = static_cast<int64_t>(str_data.size());
+ memcpy(data_buffer + offset, str_data.c_str(), str_data.size());
+ accumulated++;
+ offset += str_data.size();
+ }
+ struct_batch->numElements = stripe_row_count;
+ long_batch->numElements = stripe_row_count;
+ str_batch->numElements = stripe_row_count;
+
+ writer->add(*batch);
+ }
+
+ writer->close();
+
+ std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(
+ std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()),
+ static_cast<int64_t>(mem_stream.getLength()))));
+
+ std::unique_ptr<adapters::orc::ORCFileReader> reader;
+ ASSERT_TRUE(
+ adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok());
+
+ ASSERT_EQ(stripe_row_count * stripe_count, reader->NumberOfRows());
+ ASSERT_EQ(stripe_count, reader->NumberOfStripes());
+ accumulated = 0;
+ std::shared_ptr<RecordBatchReader> stripe_reader;
+ EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+ while (stripe_reader) {
+ std::shared_ptr<RecordBatch> record_batch;
+ EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+ while (record_batch) {
+ auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0));
+ auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1));
+ for (int j = 0; j < record_batch->num_rows(); ++j) {
+ EXPECT_EQ(accumulated % stripe_row_count, int32_array->Value(j));
+ EXPECT_EQ(std::to_string(accumulated % stripe_row_count),
+ str_array->GetString(j));
+ accumulated++;
+ }
+ EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+ }
+ EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+ }
+
+ // test seek operation
+ int64_t start_offset = 830;
+ EXPECT_TRUE(reader->Seek(stripe_row_count + start_offset).ok());
+
+ EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+ std::shared_ptr<RecordBatch> record_batch;
+ EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+ while (record_batch) {
+ auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0));
+ auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1));
+ for (int j = 0; j < record_batch->num_rows(); ++j) {
+ std::ostringstream os;
+ os << start_offset % stripe_row_count;
+ EXPECT_EQ(start_offset % stripe_row_count, int32_array->Value(j));
+ EXPECT_EQ(os.str(), str_array->GetString(j));
+ start_offset++;
+ }
+ EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+ }
+}
+} // namespace arrow
diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc
index 78a321f..89ad597 100644
--- a/cpp/src/arrow/adapters/orc/adapter.cc
+++ b/cpp/src/arrow/adapters/orc/adapter.cc
@@ -16,6 +16,7 @@
// under the License.
#include "arrow/adapters/orc/adapter.h"
+#include "arrow/adapters/orc/adapter_util.h"
#include <algorithm>
#include <cstdint>
@@ -102,120 +103,58 @@ struct StripeInformation {
uint64_t offset;
uint64_t length;
uint64_t num_rows;
+ uint64_t first_row_of_stripe;
};
-Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) {
- // When subselecting fields on read, liborc will set some nodes to nullptr,
- // so we need to check for nullptr before progressing
- if (type == nullptr) {
- *out = null();
- return Status::OK();
- }
- liborc::TypeKind kind = type->getKind();
- const int subtype_count = static_cast<int>(type->getSubtypeCount());
-
- switch (kind) {
- case liborc::BOOLEAN:
- *out = boolean();
- break;
- case liborc::BYTE:
- *out = int8();
- break;
- case liborc::SHORT:
- *out = int16();
- break;
- case liborc::INT:
- *out = int32();
- break;
- case liborc::LONG:
- *out = int64();
- break;
- case liborc::FLOAT:
- *out = float32();
- break;
- case liborc::DOUBLE:
- *out = float64();
- break;
- case liborc::VARCHAR:
- case liborc::STRING:
- *out = utf8();
- break;
- case liborc::BINARY:
- *out = binary();
- break;
- case liborc::CHAR:
- *out = fixed_size_binary(static_cast<int>(type->getMaximumLength()));
- break;
- case liborc::TIMESTAMP:
- *out = timestamp(TimeUnit::NANO);
- break;
- case liborc::DATE:
- *out = date32();
- break;
- case liborc::DECIMAL: {
- const int precision = static_cast<int>(type->getPrecision());
- const int scale = static_cast<int>(type->getScale());
- if (precision == 0) {
- // In HIVE 0.11/0.12 precision is set as 0, but means max precision
- *out = decimal(38, 6);
- } else {
- *out = decimal(precision, scale);
- }
- break;
- }
- case liborc::LIST: {
- if (subtype_count != 1) {
- return Status::Invalid("Invalid Orc List type");
- }
- std::shared_ptr<DataType> elemtype;
- RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype));
- *out = list(elemtype);
- break;
- }
- case liborc::MAP: {
- if (subtype_count != 2) {
- return Status::Invalid("Invalid Orc Map type");
- }
- std::shared_ptr<DataType> keytype;
- std::shared_ptr<DataType> valtype;
- RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype));
- RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype));
- *out = list(struct_({field("key", keytype), field("value", valtype)}));
- break;
+// The number of rows to read in a ColumnVectorBatch
+constexpr int64_t kReadRowsBatch = 1000;
+
+class OrcStripeReader : public RecordBatchReader {
+ public:
+ OrcStripeReader(std::unique_ptr<liborc::RowReader> row_reader,
+ std::shared_ptr<Schema> schema, int64_t batch_size, MemoryPool* pool)
+ : row_reader_(std::move(row_reader)),
+ schema_(schema),
+ pool_(pool),
+ batch_size_{batch_size} {}
+
+ std::shared_ptr<Schema> schema() const override { return schema_; }
+
+ Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+ std::unique_ptr<liborc::ColumnVectorBatch> batch;
+ try {
+ batch = row_reader_->createRowBatch(batch_size_);
+ } catch (const liborc::ParseError& e) {
+ return Status::Invalid(e.what());
}
- case liborc::STRUCT: {
- std::vector<std::shared_ptr<Field>> fields;
- for (int child = 0; child < subtype_count; ++child) {
- std::shared_ptr<DataType> elemtype;
- RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
- std::string name = type->getFieldName(child);
- fields.push_back(field(name, elemtype));
- }
- *out = struct_(fields);
- break;
+
+ const liborc::Type& type = row_reader_->getSelectedType();
+ if (!row_reader_->next(*batch)) {
+ out->reset();
+ return Status::OK();
}
- case liborc::UNION: {
- std::vector<std::shared_ptr<Field>> fields;
- std::vector<uint8_t> type_codes;
- for (int child = 0; child < subtype_count; ++child) {
- std::shared_ptr<DataType> elemtype;
- RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
- fields.push_back(field("_union_" + std::to_string(child), elemtype));
- type_codes.push_back(static_cast<uint8_t>(child));
- }
- *out = union_(fields, type_codes);
- break;
+
+ std::unique_ptr<RecordBatchBuilder> builder;
+ RETURN_NOT_OK(RecordBatchBuilder::Make(schema_, pool_, batch->numElements, &builder));
+
+ // The top-level type must be a struct to read into an arrow table
+ const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);
+
+ for (int i = 0; i < builder->num_fields(); i++) {
+ RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
+ batch->numElements, builder->GetField(i)));
}
- default: { return Status::Invalid("Unknown Orc type kind: ", kind); }
- }
- return Status::OK();
-}
-// The number of rows to read in a ColumnVectorBatch
-constexpr int64_t kReadRowsBatch = 1000;
+ RETURN_NOT_OK(builder->Flush(out));
+ return Status::OK();
+ }
-// The numer of nanoseconds in a second
-constexpr int64_t kOneSecondNanos = 1000000000LL;
+ private:
+ std::unique_ptr<liborc::RowReader> row_reader_;
+ std::shared_ptr<Schema> schema_;
+ MemoryPool* pool_;
+ int64_t batch_size_;
+};
class ORCFileReader::Impl {
public:
@@ -233,6 +172,7 @@ class ORCFileReader::Impl {
}
pool_ = pool;
reader_ = std::move(liborc_reader);
+ current_row_ = 0;
return Init();
}
@@ -241,10 +181,12 @@ class ORCFileReader::Impl {
int64_t nstripes = reader_->getNumberOfStripes();
stripes_.resize(nstripes);
std::unique_ptr<liborc::StripeInformation> stripe;
+ uint64_t first_row_of_stripe = 0;
for (int i = 0; i < nstripes; ++i) {
stripe = reader_->getStripe(i);
- stripes_[i] = StripeInformation(
- {stripe->getOffset(), stripe->getLength(), stripe->getNumberOfRows()});
+ stripes_[i] = StripeInformation({stripe->getOffset(), stripe->getLength(),
+ stripe->getNumberOfRows(), first_row_of_stripe});
+ first_row_of_stripe += stripe->getNumberOfRows();
}
return Status::OK();
}
@@ -349,6 +291,23 @@ class ORCFileReader::Impl {
return Status::OK();
}
+ Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number,
+ StripeInformation* out) {
+ ARROW_RETURN_IF(row_number >= NumberOfRows(),
+ Status::Invalid("Out of bounds row number: ", row_number));
+
+ for (auto it = stripes_.begin(); it != stripes_.end(); it++) {
+ if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe &&
+ static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) {
+ opts->range(it->offset, it->length);
+ *out = *it;
+ return Status::OK();
+ }
+ }
+
+ return Status::Invalid("Invalid row number", row_number);
+ }
+
Status SelectIndices(liborc::RowReaderOptions* opts,
const std::vector<int>& include_indices) {
std::list<uint64_t> include_indices_list;
@@ -374,11 +333,11 @@ class ORCFileReader::Impl {
Status ReadBatch(const liborc::RowReaderOptions& opts,
const std::shared_ptr<Schema>& schema, int64_t nrows,
std::shared_ptr<RecordBatch>* out) {
- std::unique_ptr<liborc::RowReader> rowreader;
+ std::unique_ptr<liborc::RowReader> row_reader;
std::unique_ptr<liborc::ColumnVectorBatch> batch;
try {
- rowreader = reader_->createRowReader(opts);
- batch = rowreader->createRowBatch(std::min(nrows, kReadRowsBatch));
+ row_reader = reader_->createRowReader(opts);
+ batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch));
} catch (const liborc::ParseError& e) {
return Status::Invalid(e.what());
}
@@ -388,8 +347,8 @@ class ORCFileReader::Impl {
// The top-level type must be a struct to read into an arrow table
const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);
- const liborc::Type& type = rowreader->getSelectedType();
- while (rowreader->next(*batch)) {
+ const liborc::Type& type = row_reader->getSelectedType();
+ while (row_reader->next(*batch)) {
for (int i = 0; i < builder->num_fields(); i++) {
RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
batch->numElements, builder->GetField(i)));
@@ -399,283 +358,52 @@ class ORCFileReader::Impl {
return Status::OK();
}
- Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
- int64_t offset, int64_t length, ArrayBuilder* builder) {
- if (type == nullptr) {
- return Status::OK();
- }
- liborc::TypeKind kind = type->getKind();
- switch (kind) {
- case liborc::STRUCT:
- return AppendStructBatch(type, batch, offset, length, builder);
- case liborc::LIST:
- return AppendListBatch(type, batch, offset, length, builder);
- case liborc::MAP:
- return AppendMapBatch(type, batch, offset, length, builder);
- case liborc::LONG:
- return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>(
- batch, offset, length, builder);
- case liborc::INT:
- return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch,
- int64_t>(batch, offset, length, builder);
- case liborc::SHORT:
- return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch,
- int64_t>(batch, offset, length, builder);
- case liborc::BYTE:
- return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch,
- int64_t>(batch, offset, length, builder);
- case liborc::DOUBLE:
- return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>(
- batch, offset, length, builder);
- case liborc::FLOAT:
- return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch,
- double>(batch, offset, length, builder);
- case liborc::BOOLEAN:
- return AppendBoolBatch(batch, offset, length, builder);
- case liborc::VARCHAR:
- case liborc::STRING:
- return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder);
- case liborc::BINARY:
- return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder);
- case liborc::CHAR:
- return AppendFixedBinaryBatch(batch, offset, length, builder);
- case liborc::DATE:
- return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch,
- int64_t>(batch, offset, length, builder);
- case liborc::TIMESTAMP:
- return AppendTimestampBatch(batch, offset, length, builder);
- case liborc::DECIMAL:
- return AppendDecimalBatch(type, batch, offset, length, builder);
- default:
- return Status::NotImplemented("Not implemented type kind: ", kind);
- }
- }
+ Status Seek(int64_t row_number) {
+ ARROW_RETURN_IF(row_number >= NumberOfRows(),
+ Status::Invalid("Out of bounds row number: ", row_number));
- Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
- int64_t offset, int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<StructBuilder*>(abuilder);
- auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
-
- const uint8_t* valid_bytes = nullptr;
- if (batch->hasNulls) {
- valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
- }
- RETURN_NOT_OK(builder->AppendValues(length, valid_bytes));
-
- for (int i = 0; i < builder->num_fields(); i++) {
- RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length,
- builder->field_builder(i)));
- }
+ current_row_ = row_number;
return Status::OK();
}
- Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
- int64_t offset, int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<ListBuilder*>(abuilder);
- auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
- liborc::ColumnVectorBatch* elements = batch->elements.get();
- const liborc::Type* elemtype = type->getSubtype(0);
-
- const bool has_nulls = batch->hasNulls;
- for (int64_t i = offset; i < length + offset; i++) {
- if (!has_nulls || batch->notNull[i]) {
- int64_t start = batch->offsets[i];
- int64_t end = batch->offsets[i + 1];
- RETURN_NOT_OK(builder->Append());
- RETURN_NOT_OK(AppendBatch(elemtype, elements, start, end - start,
- builder->value_builder()));
- } else {
- RETURN_NOT_OK(builder->AppendNull());
- }
- }
- return Status::OK();
- }
-
- Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
- int64_t offset, int64_t length, ArrayBuilder* abuilder) {
- auto list_builder = checked_cast<ListBuilder*>(abuilder);
- auto struct_builder = checked_cast<StructBuilder*>(list_builder->value_builder());
- auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
- liborc::ColumnVectorBatch* keys = batch->keys.get();
- liborc::ColumnVectorBatch* vals = batch->elements.get();
- const liborc::Type* keytype = type->getSubtype(0);
- const liborc::Type* valtype = type->getSubtype(1);
-
- const bool has_nulls = batch->hasNulls;
- for (int64_t i = offset; i < length + offset; i++) {
- RETURN_NOT_OK(list_builder->Append());
- int64_t start = batch->offsets[i];
- int64_t list_length = batch->offsets[i + 1] - start;
- if (list_length && (!has_nulls || batch->notNull[i])) {
- RETURN_NOT_OK(struct_builder->AppendValues(list_length, nullptr));
- RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length,
- struct_builder->field_builder(0)));
- RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length,
- struct_builder->field_builder(1)));
- }
- }
- return Status::OK();
- }
-
- template <class builder_type, class batch_type, class elem_type>
- Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
- int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<builder_type*>(abuilder);
- auto batch = checked_cast<batch_type*>(cbatch);
-
- if (length == 0) {
- return Status::OK();
- }
- const uint8_t* valid_bytes = nullptr;
- if (batch->hasNulls) {
- valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
- }
- const elem_type* source = batch->data.data() + offset;
- RETURN_NOT_OK(builder->AppendValues(source, length, valid_bytes));
- return Status::OK();
- }
-
- template <class builder_type, class target_type, class batch_type, class source_type>
- Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset,
- int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<builder_type*>(abuilder);
- auto batch = checked_cast<batch_type*>(cbatch);
-
- if (length == 0) {
- return Status::OK();
- }
-
- const uint8_t* valid_bytes = nullptr;
- if (batch->hasNulls) {
- valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
- }
- const source_type* source = batch->data.data() + offset;
- auto cast_iter = internal::MakeLazyRange(
- [&source](int64_t index) { return static_cast<target_type>(source[index]); },
- length);
-
- RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
-
- return Status::OK();
- }
-
- Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
- int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<BooleanBuilder*>(abuilder);
- auto batch = checked_cast<liborc::LongVectorBatch*>(cbatch);
-
- if (length == 0) {
- return Status::OK();
- }
-
- const uint8_t* valid_bytes = nullptr;
- if (batch->hasNulls) {
- valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
- }
- const int64_t* source = batch->data.data() + offset;
-
- auto cast_iter = internal::MakeLazyRange(
- [&source](int64_t index) { return static_cast<bool>(source[index]); }, length);
-
- RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
-
- return Status::OK();
- }
-
- Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
- int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<TimestampBuilder*>(abuilder);
- auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
-
- if (length == 0) {
+ Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
+ std::shared_ptr<RecordBatchReader>* out) {
+ if (current_row_ >= NumberOfRows()) {
+ out->reset();
return Status::OK();
}
- const uint8_t* valid_bytes = nullptr;
- if (batch->hasNulls) {
- valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ liborc::RowReaderOptions opts;
+ if (!include_indices.empty()) {
+ RETURN_NOT_OK(SelectIndices(&opts, include_indices));
}
-
- const int64_t* seconds = batch->data.data() + offset;
- const int64_t* nanos = batch->nanoseconds.data() + offset;
-
- auto transform_timestamp = [seconds, nanos](int64_t index) {
- return seconds[index] * kOneSecondNanos + nanos[index];
- };
-
- auto transform_range = internal::MakeLazyRange(transform_timestamp, length);
-
- RETURN_NOT_OK(builder->AppendValues(transform_range.begin(), transform_range.end(),
- valid_bytes));
- return Status::OK();
- }
-
- template <class builder_type>
- Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
- int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<builder_type*>(abuilder);
- auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
-
- const bool has_nulls = batch->hasNulls;
- for (int64_t i = offset; i < length + offset; i++) {
- if (!has_nulls || batch->notNull[i]) {
- RETURN_NOT_OK(
- builder->Append(batch->data[i], static_cast<int32_t>(batch->length[i])));
- } else {
- RETURN_NOT_OK(builder->AppendNull());
- }
+ StripeInformation stripe_info;
+ RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(ReadSchema(opts, &schema));
+ std::unique_ptr<liborc::RowReader> row_reader;
+ try {
+ row_reader = reader_->createRowReader(opts);
+ row_reader->seekToRow(current_row_);
+ current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
+ } catch (const liborc::ParseError& e) {
+ return Status::Invalid(e.what());
}
- return Status::OK();
- }
- Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
- int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<FixedSizeBinaryBuilder*>(abuilder);
- auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
-
- const bool has_nulls = batch->hasNulls;
- for (int64_t i = offset; i < length + offset; i++) {
- if (!has_nulls || batch->notNull[i]) {
- RETURN_NOT_OK(builder->Append(batch->data[i]));
- } else {
- RETURN_NOT_OK(builder->AppendNull());
- }
- }
+ *out = std::shared_ptr<RecordBatchReader>(
+ new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
return Status::OK();
}
- Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
- int64_t offset, int64_t length, ArrayBuilder* abuilder) {
- auto builder = checked_cast<Decimal128Builder*>(abuilder);
-
- const bool has_nulls = cbatch->hasNulls;
- if (type->getPrecision() == 0 || type->getPrecision() > 18) {
- auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
- for (int64_t i = offset; i < length + offset; i++) {
- if (!has_nulls || batch->notNull[i]) {
- RETURN_NOT_OK(builder->Append(
- Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits())));
- } else {
- RETURN_NOT_OK(builder->AppendNull());
- }
- }
- } else {
- auto batch = checked_cast<liborc::Decimal64VectorBatch*>(cbatch);
- for (int64_t i = offset; i < length + offset; i++) {
- if (!has_nulls || batch->notNull[i]) {
- RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i])));
- } else {
- RETURN_NOT_OK(builder->AppendNull());
- }
- }
- }
- return Status::OK();
+ Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) {
+ return NextStripeReader(batch_size, {}, out);
}
private:
MemoryPool* pool_;
std::unique_ptr<liborc::Reader> reader_;
std::vector<StripeInformation> stripes_;
+ int64_t current_row_;
};
ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }
@@ -721,6 +449,19 @@ Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include
return impl_->ReadStripe(stripe, include_indices, out);
}
+Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); }
+
+Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
+ std::shared_ptr<RecordBatchReader>* out) {
+ return impl_->NextStripeReader(batch_sizes, out);
+}
+
+Status ORCFileReader::NextStripeReader(int64_t batch_size,
+ const std::vector<int>& include_indices,
+ std::shared_ptr<RecordBatchReader>* out) {
+ return impl_->NextStripeReader(batch_size, include_indices, out);
+}
+
int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h
index 5ae19b2..6279f68 100644
--- a/cpp/src/arrow/adapters/orc/adapter.h
+++ b/cpp/src/arrow/adapters/orc/adapter.h
@@ -41,7 +41,7 @@ class ARROW_EXPORT ORCFileReader {
public:
~ORCFileReader();
- /// \brief Create a new ORC reader
+ /// \brief Creates a new ORC reader.
///
/// \param[in] file the data source
/// \param[in] pool a MemoryPool to use for buffer allocations
@@ -102,6 +102,35 @@ class ARROW_EXPORT ORCFileReader {
Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
std::shared_ptr<RecordBatch>* out);
+ /// \brief Seek to designated row. Invoke NextStripeReader() after seek
+ /// will return stripe reader starting from designated row.
+ ///
+ /// \param[in] row_number the rows number to seek
+ Status Seek(int64_t row_number);
+
+ /// \brief Get a stripe level record batch iterator with specified row count
+ /// in each record batch. NextStripeReader serves as an fine grain
+ /// alternative to ReadStripe which may cause OOM issue by loading
+ /// the whole stripes into memory.
+ ///
+ /// \param[in] batch_size the number of rows each record batch contains in
+ /// record batch iteration.
+ /// \param[out] out the returned stripe reader
+ Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out);
+
+ /// \brief Get a stripe level record batch iterator with specified row count
+ /// in each record batch. NextStripeReader serves as an fine grain
+ /// alternative to ReadStripe which may cause OOM issue by loading
+ /// the whole stripes into memory.
+ ///
+ /// \param[in] batch_size Get a stripe level record batch iterator with specified row
+ /// count in each record batch.
+ ///
+ /// \param[in] include_indices the selected field indices to read
+ /// \param[out] out the returned stripe reader
+ Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
+ std::shared_ptr<RecordBatchReader>* out);
+
/// \brief The number of stripes in the file
int64_t NumberOfStripes();
diff --git a/cpp/src/arrow/adapters/orc/adapter_util.cc b/cpp/src/arrow/adapters/orc/adapter_util.cc
new file mode 100644
index 0000000..235e5ba
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter_util.cc
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "arrow/adapters/orc/adapter_util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/builder.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/lazy.h"
+
+#include "orc/Exceptions.hh"
+#include "orc/OrcFile.hh"
+
+// alias to not interfere with nested orc namespace
+namespace liborc = orc;
+
+namespace arrow {
+
+namespace adapters {
+
+namespace orc {
+
+using internal::checked_cast;
+
+// The numer of nanoseconds in a second
+constexpr int64_t kOneSecondNanos = 1000000000LL;
+
+Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<StructBuilder*>(abuilder);
+ auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ RETURN_NOT_OK(builder->AppendValues(length, valid_bytes));
+
+ for (int i = 0; i < builder->num_fields(); i++) {
+ RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length,
+ builder->field_builder(i)));
+ }
+ return Status::OK();
+}
+
+Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<ListBuilder*>(abuilder);
+ auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+ liborc::ColumnVectorBatch* elements = batch->elements.get();
+ const liborc::Type* elemtype = type->getSubtype(0);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int64_t i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ int64_t start = batch->offsets[i];
+ int64_t end = batch->offsets[i + 1];
+ RETURN_NOT_OK(builder->Append());
+ RETURN_NOT_OK(
+ AppendBatch(elemtype, elements, start, end - start, builder->value_builder()));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ return Status::OK();
+}
+
+Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto list_builder = checked_cast<ListBuilder*>(abuilder);
+ auto struct_builder = checked_cast<StructBuilder*>(list_builder->value_builder());
+ auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+ liborc::ColumnVectorBatch* keys = batch->keys.get();
+ liborc::ColumnVectorBatch* vals = batch->elements.get();
+ const liborc::Type* keytype = type->getSubtype(0);
+ const liborc::Type* valtype = type->getSubtype(1);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int64_t i = offset; i < length + offset; i++) {
+ RETURN_NOT_OK(list_builder->Append());
+ int64_t start = batch->offsets[i];
+ int64_t list_length = batch->offsets[i + 1] - start;
+ if (list_length && (!has_nulls || batch->notNull[i])) {
+ RETURN_NOT_OK(struct_builder->AppendValues(list_length, nullptr));
+ RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length,
+ struct_builder->field_builder(0)));
+ RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length,
+ struct_builder->field_builder(1)));
+ }
+ }
+ return Status::OK();
+}
+
+template <class builder_type, class batch_type, class elem_type>
+Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<builder_type*>(abuilder);
+ auto batch = checked_cast<batch_type*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ const elem_type* source = batch->data.data() + offset;
+ RETURN_NOT_OK(builder->AppendValues(source, length, valid_bytes));
+ return Status::OK();
+}
+
+template <class builder_type, class target_type, class batch_type, class source_type>
+Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<builder_type*>(abuilder);
+ auto batch = checked_cast<batch_type*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ const source_type* source = batch->data.data() + offset;
+ auto cast_iter = internal::MakeLazyRange(
+ [&source](int64_t index) { return static_cast<target_type>(source[index]); },
+ length);
+
+ RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
+
+ return Status::OK();
+}
+
+Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, int64_t length,
+ ArrayBuilder* abuilder) {
+ auto builder = checked_cast<BooleanBuilder*>(abuilder);
+ auto batch = checked_cast<liborc::LongVectorBatch*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ const int64_t* source = batch->data.data() + offset;
+
+ auto cast_iter = internal::MakeLazyRange(
+ [&source](int64_t index) { return static_cast<bool>(source[index]); }, length);
+
+ RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
+
+ return Status::OK();
+}
+
+Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<TimestampBuilder*>(abuilder);
+ auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+
+ const int64_t* seconds = batch->data.data() + offset;
+ const int64_t* nanos = batch->nanoseconds.data() + offset;
+
+ auto transform_timestamp = [seconds, nanos](int64_t index) {
+ return seconds[index] * kOneSecondNanos + nanos[index];
+ };
+
+ auto transform_range = internal::MakeLazyRange(transform_timestamp, length);
+
+ RETURN_NOT_OK(
+ builder->AppendValues(transform_range.begin(), transform_range.end(), valid_bytes));
+ return Status::OK();
+}
+
+template <class builder_type>
+Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<builder_type*>(abuilder);
+ auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int64_t i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(
+ builder->Append(batch->data[i], static_cast<int32_t>(batch->length[i])));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ return Status::OK();
+}
+
+Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<FixedSizeBinaryBuilder*>(abuilder);
+ auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int64_t i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(batch->data[i]));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ return Status::OK();
+}
+
+Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto builder = checked_cast<Decimal128Builder*>(abuilder);
+
+ const bool has_nulls = cbatch->hasNulls;
+ if (type->getPrecision() == 0 || type->getPrecision() > 18) {
+ auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+ for (int64_t i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(
+ Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits())));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ } else {
+ auto batch = checked_cast<liborc::Decimal64VectorBatch*>(cbatch);
+ for (int64_t i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i])));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
+ int64_t offset, int64_t length, ArrayBuilder* builder) {
+ if (type == nullptr) {
+ return Status::OK();
+ }
+ liborc::TypeKind kind = type->getKind();
+ switch (kind) {
+ case liborc::STRUCT:
+ return AppendStructBatch(type, batch, offset, length, builder);
+ case liborc::LIST:
+ return AppendListBatch(type, batch, offset, length, builder);
+ case liborc::MAP:
+ return AppendMapBatch(type, batch, offset, length, builder);
+ case liborc::LONG:
+ return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>(
+ batch, offset, length, builder);
+ case liborc::INT:
+ return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::SHORT:
+ return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::BYTE:
+ return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::DOUBLE:
+ return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>(
+ batch, offset, length, builder);
+ case liborc::FLOAT:
+ return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch,
+ double>(batch, offset, length, builder);
+ case liborc::BOOLEAN:
+ return AppendBoolBatch(batch, offset, length, builder);
+ case liborc::VARCHAR:
+ case liborc::STRING:
+ return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder);
+ case liborc::BINARY:
+ return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder);
+ case liborc::CHAR:
+ return AppendFixedBinaryBatch(batch, offset, length, builder);
+ case liborc::DATE:
+ return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::TIMESTAMP:
+ return AppendTimestampBatch(batch, offset, length, builder);
+ case liborc::DECIMAL:
+ return AppendDecimalBatch(type, batch, offset, length, builder);
+ default:
+ return Status::NotImplemented("Not implemented type kind: ", kind);
+ }
+}
+
+Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) {
+ // When subselecting fields on read, liborc will set some nodes to nullptr,
+ // so we need to check for nullptr before progressing
+ if (type == nullptr) {
+ *out = null();
+ return Status::OK();
+ }
+ liborc::TypeKind kind = type->getKind();
+ const int subtype_count = static_cast<int>(type->getSubtypeCount());
+
+ switch (kind) {
+ case liborc::BOOLEAN:
+ *out = boolean();
+ break;
+ case liborc::BYTE:
+ *out = int8();
+ break;
+ case liborc::SHORT:
+ *out = int16();
+ break;
+ case liborc::INT:
+ *out = int32();
+ break;
+ case liborc::LONG:
+ *out = int64();
+ break;
+ case liborc::FLOAT:
+ *out = float32();
+ break;
+ case liborc::DOUBLE:
+ *out = float64();
+ break;
+ case liborc::VARCHAR:
+ case liborc::STRING:
+ *out = utf8();
+ break;
+ case liborc::BINARY:
+ *out = binary();
+ break;
+ case liborc::CHAR:
+ *out = fixed_size_binary(static_cast<int>(type->getMaximumLength()));
+ break;
+ case liborc::TIMESTAMP:
+ *out = timestamp(TimeUnit::NANO);
+ break;
+ case liborc::DATE:
+ *out = date32();
+ break;
+ case liborc::DECIMAL: {
+ const int precision = static_cast<int>(type->getPrecision());
+ const int scale = static_cast<int>(type->getScale());
+ if (precision == 0) {
+ // In HIVE 0.11/0.12 precision is set as 0, but means max precision
+ *out = decimal(38, 6);
+ } else {
+ *out = decimal(precision, scale);
+ }
+ break;
+ }
+ case liborc::LIST: {
+ if (subtype_count != 1) {
+ return Status::Invalid("Invalid Orc List type");
+ }
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype));
+ *out = list(elemtype);
+ break;
+ }
+ case liborc::MAP: {
+ if (subtype_count != 2) {
+ return Status::Invalid("Invalid Orc Map type");
+ }
+ std::shared_ptr<DataType> keytype;
+ std::shared_ptr<DataType> valtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype));
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype));
+ *out = list(struct_({field("key", keytype), field("value", valtype)}));
+ break;
+ }
+ case liborc::STRUCT: {
+ std::vector<std::shared_ptr<Field>> fields;
+ for (int child = 0; child < subtype_count; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
+ std::string name = type->getFieldName(child);
+ fields.push_back(field(name, elemtype));
+ }
+ *out = struct_(fields);
+ break;
+ }
+ case liborc::UNION: {
+ std::vector<std::shared_ptr<Field>> fields;
+ std::vector<uint8_t> type_codes;
+ for (int child = 0; child < subtype_count; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
+ fields.push_back(field("_union_" + std::to_string(child), elemtype));
+ type_codes.push_back(static_cast<uint8_t>(child));
+ }
+ *out = union_(fields, type_codes);
+ break;
+ }
+ default: { return Status::Invalid("Unknown Orc type kind: ", kind); }
+ }
+ return Status::OK();
+}
+
+} // namespace orc
+} // namespace adapters
+} // namespace arrow
diff --git a/cpp/src/arrow/adapters/orc/adapter_util.h b/cpp/src/arrow/adapters/orc/adapter_util.h
new file mode 100644
index 0000000..eede230
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter_util.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_ADAPATER_UTIL_H
+#define ARROW_ADAPATER_UTIL_H
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/array/builder_base.h"
+#include "arrow/status.h"
+#include "orc/OrcFile.hh"
+
+namespace liborc = orc;
+
+namespace arrow {
+
+namespace adapters {
+
+namespace orc {
+
+Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out);
+
+Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
+ int64_t offset, int64_t length, ArrayBuilder* builder);
+} // namespace orc
+} // namespace adapters
+} // namespace arrow
+
+#endif // ARROW_ADAPATER_UTIL_H