You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/03/25 04:22:23 UTC

[arrow] branch master updated: ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration

This is an automated email from the ASF dual-hosted git repository.

emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3129732  ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration
3129732 is described below

commit 3129732a18210d0c8921b45f79be4f34eadf0cc3
Author: Yurui Zhou <yu...@alibaba-inc.com>
AuthorDate: Sun Mar 24 21:21:50 2019 -0700

    ARROW-4772: [C++] new ORC adapter interface for stripe and row iteration
    
    Improvemnt of current ORC adapter interface that enable following operation:
    
    - enable seek operation to designated row
    - enable iteration over stripe with StripeReader
        - StripeReader is neccesary since ORC support stripe level dictionary
           encoding, for this reason the Arrow Schema could varies between
           stripes if Dictionary Based Type is enabled.
    - enable row level iteration with StripeReader
    
    Author: Yurui Zhou <yu...@alibaba-inc.com>
    
    Closes #3843 from yuruiz/OrcAdapterInterface and squashes the following commits:
    
    9c3229b5 <Yurui Zhou> resolve comments
    e3911374 <Yurui Zhou> resolve comments
    74ea0727 <Yurui Zhou> fix clang format error
    94d34827 <Yurui Zhou> resolve comments
    0a525dcb <Yurui Zhou> Resolve comments
    b241b64d <Yurui Zhou> fix lint error
    bf12efed <Yurui Zhou> remove unnecessary deprecation mark
    340f4f96 <Yurui Zhou> resolve code style issues
    1f5ecdce <Yurui Zhou> fix clang format error
    0547348f <Yurui Zhou> Fix cpplint errors
    1d258540 <Yurui Zhou> Fix cmake format error
    eb187c0e <Yurui Zhou> ARROW-4713: new ORC adapter interface for stripe and row iteration Improvemnt of current ORC adapter interface that enable following operation: - enable seek operation to designated row - enable iteration over stripe with StripeReader   - StripeReader is neccesary since ORC support stripe level dictionary   encoding, for this reason the Arrow Schema could varies between   stripes if Dictionary Based Type is enabled. - enable row level iteration with StripeReader
---
 cpp/src/arrow/CMakeLists.txt               |   2 +-
 cpp/src/arrow/adapters/orc/CMakeLists.txt  |  19 ++
 cpp/src/arrow/adapters/orc/adapter-test.cc | 158 ++++++++++
 cpp/src/arrow/adapters/orc/adapter.cc      | 485 +++++++----------------------
 cpp/src/arrow/adapters/orc/adapter.h       |  31 +-
 cpp/src/arrow/adapters/orc/adapter_util.cc | 427 +++++++++++++++++++++++++
 cpp/src/arrow/adapters/orc/adapter_util.h  |  44 +++
 7 files changed, 792 insertions(+), 374 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 865c453..3ff8ba1 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -202,7 +202,7 @@ endif()
 
 if(ARROW_ORC)
   add_subdirectory(adapters/orc)
-  set(ARROW_SRCS adapters/orc/adapter.cc ${ARROW_SRCS})
+  set(ARROW_SRCS adapters/orc/adapter.cc adapters/orc/adapter_util.cc ${ARROW_SRCS})
 endif()
 
 if(ARROW_TENSORFLOW)
diff --git a/cpp/src/arrow/adapters/orc/CMakeLists.txt b/cpp/src/arrow/adapters/orc/CMakeLists.txt
index 6c8b47e..97bff89 100644
--- a/cpp/src/arrow/adapters/orc/CMakeLists.txt
+++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt
@@ -26,3 +26,22 @@ install(FILES adapter.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/
 configure_file(arrow-orc.pc.in "${CMAKE_CURRENT_BINARY_DIR}/arrow-orc.pc" @ONLY)
 install(FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-orc.pc"
         DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
+
+set(ORC_MIN_TEST_LIBS GTest::Main GTest::GTest)
+
+if(ARROW_BUILD_STATIC)
+  set(ARROW_LIBRARIES_FOR_STATIC_TESTS arrow_testing_static arrow_static)
+else()
+  set(ARROW_LIBRARIES_FOR_STATIC_TESTS arrow_testing_shared arrow_shared)
+endif()
+
+if(APPLE)
+  set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} ${CMAKE_DL_LIBS})
+elseif(NOT MSVC)
+  set(ORC_MIN_TEST_LIBS ${ORC_MIN_TEST_LIBS} pthread ${CMAKE_DL_LIBS})
+endif()
+
+set(ORC_STATIC_TEST_LINK_LIBS ${ORC_MIN_TEST_LIBS} ${ARROW_LIBRARIES_FOR_STATIC_TESTS}
+    orc_static)
+
+add_arrow_test(adapter-test PREFIX "orc" STATIC_LINK_LIBS ${ORC_STATIC_TEST_LINK_LIBS})
diff --git a/cpp/src/arrow/adapters/orc/adapter-test.cc b/cpp/src/arrow/adapters/orc/adapter-test.cc
new file mode 100644
index 0000000..f42144e
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter-test.cc
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/adapters/orc/adapter.h"
+#include "arrow/array.h"
+#include "arrow/io/api.h"
+
+#include <gtest/gtest.h>
+#include <orc/OrcFile.hh>
+
+namespace liborc = orc;
+
+namespace arrow {
+
+constexpr int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024;
+
+class MemoryOutputStream : public liborc::OutputStream {
+ public:
+  explicit MemoryOutputStream(ssize_t capacity)
+      : data_(capacity), name_("MemoryOutputStream"), length_(0) {}
+
+  uint64_t getLength() const override { return length_; }
+
+  uint64_t getNaturalWriteSize() const override { return natural_write_size_; }
+
+  void write(const void* buf, size_t size) override {
+    memcpy(data_.data() + length_, buf, size);
+    length_ += size;
+  }
+
+  const std::string& getName() const override { return name_; }
+
+  const char* getData() const { return data_.data(); }
+
+  void close() override {}
+
+  void reset() { length_ = 0; }
+
+ private:
+  std::vector<char> data_;
+  std::string name_;
+  uint64_t length_, natural_write_size_;
+};
+
+std::unique_ptr<liborc::Writer> CreateWriter(uint64_t stripe_size,
+                                             const liborc::Type& type,
+                                             liborc::OutputStream* stream) {
+  liborc::WriterOptions options;
+  options.setStripeSize(stripe_size);
+  options.setCompressionBlockSize(1024);
+  options.setMemoryPool(liborc::getDefaultPool());
+  options.setRowIndexStride(0);
+  return liborc::createWriter(type, stream, options);
+}
+
+TEST(TestAdapter, readIntAndStringFileMultipleStripes) {
+  MemoryOutputStream mem_stream(DEFAULT_MEM_STREAM_SIZE);
+  ORC_UNIQUE_PTR<liborc::Type> type(
+      liborc::Type::buildTypeFromString("struct<col1:int,col2:string>"));
+
+  constexpr uint64_t stripe_size = 1024;  // 1K
+  constexpr uint64_t stripe_count = 10;
+  constexpr uint64_t stripe_row_count = 65535;
+  constexpr uint64_t reader_batch_size = 1024;
+
+  auto writer = CreateWriter(stripe_size, *type, &mem_stream);
+  auto batch = writer->createRowBatch(stripe_row_count);
+  auto struct_batch = dynamic_cast<liborc::StructVectorBatch*>(batch.get());
+  auto long_batch = dynamic_cast<liborc::LongVectorBatch*>(struct_batch->fields[0]);
+  auto str_batch = dynamic_cast<liborc::StringVectorBatch*>(struct_batch->fields[1]);
+  int64_t accumulated = 0;
+
+  for (uint64_t j = 0; j < stripe_count; ++j) {
+    char data_buffer[327675];
+    uint64_t offset = 0;
+    for (uint64_t i = 0; i < stripe_row_count; ++i) {
+      std::string str_data = std::to_string(accumulated % stripe_row_count);
+      long_batch->data[i] = static_cast<int64_t>(accumulated % stripe_row_count);
+      str_batch->data[i] = data_buffer + offset;
+      str_batch->length[i] = static_cast<int64_t>(str_data.size());
+      memcpy(data_buffer + offset, str_data.c_str(), str_data.size());
+      accumulated++;
+      offset += str_data.size();
+    }
+    struct_batch->numElements = stripe_row_count;
+    long_batch->numElements = stripe_row_count;
+    str_batch->numElements = stripe_row_count;
+
+    writer->add(*batch);
+  }
+
+  writer->close();
+
+  std::shared_ptr<io::RandomAccessFile> in_stream(new io::BufferReader(
+      std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()),
+                               static_cast<int64_t>(mem_stream.getLength()))));
+
+  std::unique_ptr<adapters::orc::ORCFileReader> reader;
+  ASSERT_TRUE(
+      adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool(), &reader).ok());
+
+  ASSERT_EQ(stripe_row_count * stripe_count, reader->NumberOfRows());
+  ASSERT_EQ(stripe_count, reader->NumberOfStripes());
+  accumulated = 0;
+  std::shared_ptr<RecordBatchReader> stripe_reader;
+  EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+  while (stripe_reader) {
+    std::shared_ptr<RecordBatch> record_batch;
+    EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+    while (record_batch) {
+      auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0));
+      auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1));
+      for (int j = 0; j < record_batch->num_rows(); ++j) {
+        EXPECT_EQ(accumulated % stripe_row_count, int32_array->Value(j));
+        EXPECT_EQ(std::to_string(accumulated % stripe_row_count),
+                  str_array->GetString(j));
+        accumulated++;
+      }
+      EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+    }
+    EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+  }
+
+  // test seek operation
+  int64_t start_offset = 830;
+  EXPECT_TRUE(reader->Seek(stripe_row_count + start_offset).ok());
+
+  EXPECT_TRUE(reader->NextStripeReader(reader_batch_size, &stripe_reader).ok());
+  std::shared_ptr<RecordBatch> record_batch;
+  EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+  while (record_batch) {
+    auto int32_array = std::dynamic_pointer_cast<Int32Array>(record_batch->column(0));
+    auto str_array = std::dynamic_pointer_cast<StringArray>(record_batch->column(1));
+    for (int j = 0; j < record_batch->num_rows(); ++j) {
+      std::ostringstream os;
+      os << start_offset % stripe_row_count;
+      EXPECT_EQ(start_offset % stripe_row_count, int32_array->Value(j));
+      EXPECT_EQ(os.str(), str_array->GetString(j));
+      start_offset++;
+    }
+    EXPECT_TRUE(stripe_reader->ReadNext(&record_batch).ok());
+  }
+}
+}  // namespace arrow
diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc
index 78a321f..89ad597 100644
--- a/cpp/src/arrow/adapters/orc/adapter.cc
+++ b/cpp/src/arrow/adapters/orc/adapter.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "arrow/adapters/orc/adapter.h"
+#include "arrow/adapters/orc/adapter_util.h"
 
 #include <algorithm>
 #include <cstdint>
@@ -102,120 +103,58 @@ struct StripeInformation {
   uint64_t offset;
   uint64_t length;
   uint64_t num_rows;
+  uint64_t first_row_of_stripe;
 };
 
-Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) {
-  // When subselecting fields on read, liborc will set some nodes to nullptr,
-  // so we need to check for nullptr before progressing
-  if (type == nullptr) {
-    *out = null();
-    return Status::OK();
-  }
-  liborc::TypeKind kind = type->getKind();
-  const int subtype_count = static_cast<int>(type->getSubtypeCount());
-
-  switch (kind) {
-    case liborc::BOOLEAN:
-      *out = boolean();
-      break;
-    case liborc::BYTE:
-      *out = int8();
-      break;
-    case liborc::SHORT:
-      *out = int16();
-      break;
-    case liborc::INT:
-      *out = int32();
-      break;
-    case liborc::LONG:
-      *out = int64();
-      break;
-    case liborc::FLOAT:
-      *out = float32();
-      break;
-    case liborc::DOUBLE:
-      *out = float64();
-      break;
-    case liborc::VARCHAR:
-    case liborc::STRING:
-      *out = utf8();
-      break;
-    case liborc::BINARY:
-      *out = binary();
-      break;
-    case liborc::CHAR:
-      *out = fixed_size_binary(static_cast<int>(type->getMaximumLength()));
-      break;
-    case liborc::TIMESTAMP:
-      *out = timestamp(TimeUnit::NANO);
-      break;
-    case liborc::DATE:
-      *out = date32();
-      break;
-    case liborc::DECIMAL: {
-      const int precision = static_cast<int>(type->getPrecision());
-      const int scale = static_cast<int>(type->getScale());
-      if (precision == 0) {
-        // In HIVE 0.11/0.12 precision is set as 0, but means max precision
-        *out = decimal(38, 6);
-      } else {
-        *out = decimal(precision, scale);
-      }
-      break;
-    }
-    case liborc::LIST: {
-      if (subtype_count != 1) {
-        return Status::Invalid("Invalid Orc List type");
-      }
-      std::shared_ptr<DataType> elemtype;
-      RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype));
-      *out = list(elemtype);
-      break;
-    }
-    case liborc::MAP: {
-      if (subtype_count != 2) {
-        return Status::Invalid("Invalid Orc Map type");
-      }
-      std::shared_ptr<DataType> keytype;
-      std::shared_ptr<DataType> valtype;
-      RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype));
-      RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype));
-      *out = list(struct_({field("key", keytype), field("value", valtype)}));
-      break;
+// The number of rows to read in a ColumnVectorBatch
+constexpr int64_t kReadRowsBatch = 1000;
+
+class OrcStripeReader : public RecordBatchReader {
+ public:
+  OrcStripeReader(std::unique_ptr<liborc::RowReader> row_reader,
+                  std::shared_ptr<Schema> schema, int64_t batch_size, MemoryPool* pool)
+      : row_reader_(std::move(row_reader)),
+        schema_(schema),
+        pool_(pool),
+        batch_size_{batch_size} {}
+
+  std::shared_ptr<Schema> schema() const override { return schema_; }
+
+  Status ReadNext(std::shared_ptr<RecordBatch>* out) override {
+    std::unique_ptr<liborc::ColumnVectorBatch> batch;
+    try {
+      batch = row_reader_->createRowBatch(batch_size_);
+    } catch (const liborc::ParseError& e) {
+      return Status::Invalid(e.what());
     }
-    case liborc::STRUCT: {
-      std::vector<std::shared_ptr<Field>> fields;
-      for (int child = 0; child < subtype_count; ++child) {
-        std::shared_ptr<DataType> elemtype;
-        RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
-        std::string name = type->getFieldName(child);
-        fields.push_back(field(name, elemtype));
-      }
-      *out = struct_(fields);
-      break;
+
+    const liborc::Type& type = row_reader_->getSelectedType();
+    if (!row_reader_->next(*batch)) {
+      out->reset();
+      return Status::OK();
     }
-    case liborc::UNION: {
-      std::vector<std::shared_ptr<Field>> fields;
-      std::vector<uint8_t> type_codes;
-      for (int child = 0; child < subtype_count; ++child) {
-        std::shared_ptr<DataType> elemtype;
-        RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
-        fields.push_back(field("_union_" + std::to_string(child), elemtype));
-        type_codes.push_back(static_cast<uint8_t>(child));
-      }
-      *out = union_(fields, type_codes);
-      break;
+
+    std::unique_ptr<RecordBatchBuilder> builder;
+    RETURN_NOT_OK(RecordBatchBuilder::Make(schema_, pool_, batch->numElements, &builder));
+
+    // The top-level type must be a struct to read into an arrow table
+    const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);
+
+    for (int i = 0; i < builder->num_fields(); i++) {
+      RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
+                                batch->numElements, builder->GetField(i)));
     }
-    default: { return Status::Invalid("Unknown Orc type kind: ", kind); }
-  }
-  return Status::OK();
-}
 
-// The number of rows to read in a ColumnVectorBatch
-constexpr int64_t kReadRowsBatch = 1000;
+    RETURN_NOT_OK(builder->Flush(out));
+    return Status::OK();
+  }
 
-// The numer of nanoseconds in a second
-constexpr int64_t kOneSecondNanos = 1000000000LL;
+ private:
+  std::unique_ptr<liborc::RowReader> row_reader_;
+  std::shared_ptr<Schema> schema_;
+  MemoryPool* pool_;
+  int64_t batch_size_;
+};
 
 class ORCFileReader::Impl {
  public:
@@ -233,6 +172,7 @@ class ORCFileReader::Impl {
     }
     pool_ = pool;
     reader_ = std::move(liborc_reader);
+    current_row_ = 0;
 
     return Init();
   }
@@ -241,10 +181,12 @@ class ORCFileReader::Impl {
     int64_t nstripes = reader_->getNumberOfStripes();
     stripes_.resize(nstripes);
     std::unique_ptr<liborc::StripeInformation> stripe;
+    uint64_t first_row_of_stripe = 0;
     for (int i = 0; i < nstripes; ++i) {
       stripe = reader_->getStripe(i);
-      stripes_[i] = StripeInformation(
-          {stripe->getOffset(), stripe->getLength(), stripe->getNumberOfRows()});
+      stripes_[i] = StripeInformation({stripe->getOffset(), stripe->getLength(),
+                                       stripe->getNumberOfRows(), first_row_of_stripe});
+      first_row_of_stripe += stripe->getNumberOfRows();
     }
     return Status::OK();
   }
@@ -349,6 +291,23 @@ class ORCFileReader::Impl {
     return Status::OK();
   }
 
+  Status SelectStripeWithRowNumber(liborc::RowReaderOptions* opts, int64_t row_number,
+                                   StripeInformation* out) {
+    ARROW_RETURN_IF(row_number >= NumberOfRows(),
+                    Status::Invalid("Out of bounds row number: ", row_number));
+
+    for (auto it = stripes_.begin(); it != stripes_.end(); it++) {
+      if (static_cast<uint64_t>(row_number) >= it->first_row_of_stripe &&
+          static_cast<uint64_t>(row_number) < it->first_row_of_stripe + it->num_rows) {
+        opts->range(it->offset, it->length);
+        *out = *it;
+        return Status::OK();
+      }
+    }
+
+    return Status::Invalid("Invalid row number", row_number);
+  }
+
   Status SelectIndices(liborc::RowReaderOptions* opts,
                        const std::vector<int>& include_indices) {
     std::list<uint64_t> include_indices_list;
@@ -374,11 +333,11 @@ class ORCFileReader::Impl {
   Status ReadBatch(const liborc::RowReaderOptions& opts,
                    const std::shared_ptr<Schema>& schema, int64_t nrows,
                    std::shared_ptr<RecordBatch>* out) {
-    std::unique_ptr<liborc::RowReader> rowreader;
+    std::unique_ptr<liborc::RowReader> row_reader;
     std::unique_ptr<liborc::ColumnVectorBatch> batch;
     try {
-      rowreader = reader_->createRowReader(opts);
-      batch = rowreader->createRowBatch(std::min(nrows, kReadRowsBatch));
+      row_reader = reader_->createRowReader(opts);
+      batch = row_reader->createRowBatch(std::min(nrows, kReadRowsBatch));
     } catch (const liborc::ParseError& e) {
       return Status::Invalid(e.what());
     }
@@ -388,8 +347,8 @@ class ORCFileReader::Impl {
     // The top-level type must be a struct to read into an arrow table
     const auto& struct_batch = checked_cast<liborc::StructVectorBatch&>(*batch);
 
-    const liborc::Type& type = rowreader->getSelectedType();
-    while (rowreader->next(*batch)) {
+    const liborc::Type& type = row_reader->getSelectedType();
+    while (row_reader->next(*batch)) {
       for (int i = 0; i < builder->num_fields(); i++) {
         RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
                                   batch->numElements, builder->GetField(i)));
@@ -399,283 +358,52 @@ class ORCFileReader::Impl {
     return Status::OK();
   }
 
-  Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
-                     int64_t offset, int64_t length, ArrayBuilder* builder) {
-    if (type == nullptr) {
-      return Status::OK();
-    }
-    liborc::TypeKind kind = type->getKind();
-    switch (kind) {
-      case liborc::STRUCT:
-        return AppendStructBatch(type, batch, offset, length, builder);
-      case liborc::LIST:
-        return AppendListBatch(type, batch, offset, length, builder);
-      case liborc::MAP:
-        return AppendMapBatch(type, batch, offset, length, builder);
-      case liborc::LONG:
-        return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>(
-            batch, offset, length, builder);
-      case liborc::INT:
-        return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch,
-                                      int64_t>(batch, offset, length, builder);
-      case liborc::SHORT:
-        return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch,
-                                      int64_t>(batch, offset, length, builder);
-      case liborc::BYTE:
-        return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch,
-                                      int64_t>(batch, offset, length, builder);
-      case liborc::DOUBLE:
-        return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>(
-            batch, offset, length, builder);
-      case liborc::FLOAT:
-        return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch,
-                                      double>(batch, offset, length, builder);
-      case liborc::BOOLEAN:
-        return AppendBoolBatch(batch, offset, length, builder);
-      case liborc::VARCHAR:
-      case liborc::STRING:
-        return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder);
-      case liborc::BINARY:
-        return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder);
-      case liborc::CHAR:
-        return AppendFixedBinaryBatch(batch, offset, length, builder);
-      case liborc::DATE:
-        return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch,
-                                      int64_t>(batch, offset, length, builder);
-      case liborc::TIMESTAMP:
-        return AppendTimestampBatch(batch, offset, length, builder);
-      case liborc::DECIMAL:
-        return AppendDecimalBatch(type, batch, offset, length, builder);
-      default:
-        return Status::NotImplemented("Not implemented type kind: ", kind);
-    }
-  }
+  Status Seek(int64_t row_number) {
+    ARROW_RETURN_IF(row_number >= NumberOfRows(),
+                    Status::Invalid("Out of bounds row number: ", row_number));
 
-  Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
-                           int64_t offset, int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<StructBuilder*>(abuilder);
-    auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
-
-    const uint8_t* valid_bytes = nullptr;
-    if (batch->hasNulls) {
-      valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
-    }
-    RETURN_NOT_OK(builder->AppendValues(length, valid_bytes));
-
-    for (int i = 0; i < builder->num_fields(); i++) {
-      RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length,
-                                builder->field_builder(i)));
-    }
+    current_row_ = row_number;
     return Status::OK();
   }
 
-  Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
-                         int64_t offset, int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<ListBuilder*>(abuilder);
-    auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
-    liborc::ColumnVectorBatch* elements = batch->elements.get();
-    const liborc::Type* elemtype = type->getSubtype(0);
-
-    const bool has_nulls = batch->hasNulls;
-    for (int64_t i = offset; i < length + offset; i++) {
-      if (!has_nulls || batch->notNull[i]) {
-        int64_t start = batch->offsets[i];
-        int64_t end = batch->offsets[i + 1];
-        RETURN_NOT_OK(builder->Append());
-        RETURN_NOT_OK(AppendBatch(elemtype, elements, start, end - start,
-                                  builder->value_builder()));
-      } else {
-        RETURN_NOT_OK(builder->AppendNull());
-      }
-    }
-    return Status::OK();
-  }
-
-  Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
-                        int64_t offset, int64_t length, ArrayBuilder* abuilder) {
-    auto list_builder = checked_cast<ListBuilder*>(abuilder);
-    auto struct_builder = checked_cast<StructBuilder*>(list_builder->value_builder());
-    auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
-    liborc::ColumnVectorBatch* keys = batch->keys.get();
-    liborc::ColumnVectorBatch* vals = batch->elements.get();
-    const liborc::Type* keytype = type->getSubtype(0);
-    const liborc::Type* valtype = type->getSubtype(1);
-
-    const bool has_nulls = batch->hasNulls;
-    for (int64_t i = offset; i < length + offset; i++) {
-      RETURN_NOT_OK(list_builder->Append());
-      int64_t start = batch->offsets[i];
-      int64_t list_length = batch->offsets[i + 1] - start;
-      if (list_length && (!has_nulls || batch->notNull[i])) {
-        RETURN_NOT_OK(struct_builder->AppendValues(list_length, nullptr));
-        RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length,
-                                  struct_builder->field_builder(0)));
-        RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length,
-                                  struct_builder->field_builder(1)));
-      }
-    }
-    return Status::OK();
-  }
-
-  template <class builder_type, class batch_type, class elem_type>
-  Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
-                            int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<builder_type*>(abuilder);
-    auto batch = checked_cast<batch_type*>(cbatch);
-
-    if (length == 0) {
-      return Status::OK();
-    }
-    const uint8_t* valid_bytes = nullptr;
-    if (batch->hasNulls) {
-      valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
-    }
-    const elem_type* source = batch->data.data() + offset;
-    RETURN_NOT_OK(builder->AppendValues(source, length, valid_bytes));
-    return Status::OK();
-  }
-
-  template <class builder_type, class target_type, class batch_type, class source_type>
-  Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset,
-                                int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<builder_type*>(abuilder);
-    auto batch = checked_cast<batch_type*>(cbatch);
-
-    if (length == 0) {
-      return Status::OK();
-    }
-
-    const uint8_t* valid_bytes = nullptr;
-    if (batch->hasNulls) {
-      valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
-    }
-    const source_type* source = batch->data.data() + offset;
-    auto cast_iter = internal::MakeLazyRange(
-        [&source](int64_t index) { return static_cast<target_type>(source[index]); },
-        length);
-
-    RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
-
-    return Status::OK();
-  }
-
-  Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
-                         int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<BooleanBuilder*>(abuilder);
-    auto batch = checked_cast<liborc::LongVectorBatch*>(cbatch);
-
-    if (length == 0) {
-      return Status::OK();
-    }
-
-    const uint8_t* valid_bytes = nullptr;
-    if (batch->hasNulls) {
-      valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
-    }
-    const int64_t* source = batch->data.data() + offset;
-
-    auto cast_iter = internal::MakeLazyRange(
-        [&source](int64_t index) { return static_cast<bool>(source[index]); }, length);
-
-    RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
-
-    return Status::OK();
-  }
-
-  Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
-                              int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<TimestampBuilder*>(abuilder);
-    auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
-
-    if (length == 0) {
+  Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
+                          std::shared_ptr<RecordBatchReader>* out) {
+    if (current_row_ >= NumberOfRows()) {
+      out->reset();
       return Status::OK();
     }
 
-    const uint8_t* valid_bytes = nullptr;
-    if (batch->hasNulls) {
-      valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+    liborc::RowReaderOptions opts;
+    if (!include_indices.empty()) {
+      RETURN_NOT_OK(SelectIndices(&opts, include_indices));
     }
-
-    const int64_t* seconds = batch->data.data() + offset;
-    const int64_t* nanos = batch->nanoseconds.data() + offset;
-
-    auto transform_timestamp = [seconds, nanos](int64_t index) {
-      return seconds[index] * kOneSecondNanos + nanos[index];
-    };
-
-    auto transform_range = internal::MakeLazyRange(transform_timestamp, length);
-
-    RETURN_NOT_OK(builder->AppendValues(transform_range.begin(), transform_range.end(),
-                                        valid_bytes));
-    return Status::OK();
-  }
-
-  template <class builder_type>
-  Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
-                           int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<builder_type*>(abuilder);
-    auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
-
-    const bool has_nulls = batch->hasNulls;
-    for (int64_t i = offset; i < length + offset; i++) {
-      if (!has_nulls || batch->notNull[i]) {
-        RETURN_NOT_OK(
-            builder->Append(batch->data[i], static_cast<int32_t>(batch->length[i])));
-      } else {
-        RETURN_NOT_OK(builder->AppendNull());
-      }
+    StripeInformation stripe_info;
+    RETURN_NOT_OK(SelectStripeWithRowNumber(&opts, current_row_, &stripe_info));
+    std::shared_ptr<Schema> schema;
+    RETURN_NOT_OK(ReadSchema(opts, &schema));
+    std::unique_ptr<liborc::RowReader> row_reader;
+    try {
+      row_reader = reader_->createRowReader(opts);
+      row_reader->seekToRow(current_row_);
+      current_row_ = stripe_info.first_row_of_stripe + stripe_info.num_rows;
+    } catch (const liborc::ParseError& e) {
+      return Status::Invalid(e.what());
     }
-    return Status::OK();
-  }
 
-  Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
-                                int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<FixedSizeBinaryBuilder*>(abuilder);
-    auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
-
-    const bool has_nulls = batch->hasNulls;
-    for (int64_t i = offset; i < length + offset; i++) {
-      if (!has_nulls || batch->notNull[i]) {
-        RETURN_NOT_OK(builder->Append(batch->data[i]));
-      } else {
-        RETURN_NOT_OK(builder->AppendNull());
-      }
-    }
+    *out = std::shared_ptr<RecordBatchReader>(
+        new OrcStripeReader(std::move(row_reader), schema, batch_size, pool_));
     return Status::OK();
   }
 
-  Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
-                            int64_t offset, int64_t length, ArrayBuilder* abuilder) {
-    auto builder = checked_cast<Decimal128Builder*>(abuilder);
-
-    const bool has_nulls = cbatch->hasNulls;
-    if (type->getPrecision() == 0 || type->getPrecision() > 18) {
-      auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
-      for (int64_t i = offset; i < length + offset; i++) {
-        if (!has_nulls || batch->notNull[i]) {
-          RETURN_NOT_OK(builder->Append(
-              Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits())));
-        } else {
-          RETURN_NOT_OK(builder->AppendNull());
-        }
-      }
-    } else {
-      auto batch = checked_cast<liborc::Decimal64VectorBatch*>(cbatch);
-      for (int64_t i = offset; i < length + offset; i++) {
-        if (!has_nulls || batch->notNull[i]) {
-          RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i])));
-        } else {
-          RETURN_NOT_OK(builder->AppendNull());
-        }
-      }
-    }
-    return Status::OK();
+  Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out) {
+    return NextStripeReader(batch_size, {}, out);
   }
 
  private:
   MemoryPool* pool_;
   std::unique_ptr<liborc::Reader> reader_;
   std::vector<StripeInformation> stripes_;
+  int64_t current_row_;
 };
 
 ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }
@@ -721,6 +449,19 @@ Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include
   return impl_->ReadStripe(stripe, include_indices, out);
 }
 
+Status ORCFileReader::Seek(int64_t row_number) { return impl_->Seek(row_number); }
+
+Status ORCFileReader::NextStripeReader(int64_t batch_sizes,
+                                       std::shared_ptr<RecordBatchReader>* out) {
+  return impl_->NextStripeReader(batch_sizes, out);
+}
+
+Status ORCFileReader::NextStripeReader(int64_t batch_size,
+                                       const std::vector<int>& include_indices,
+                                       std::shared_ptr<RecordBatchReader>* out) {
+  return impl_->NextStripeReader(batch_size, include_indices, out);
+}
+
 int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
 
 int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h
index 5ae19b2..6279f68 100644
--- a/cpp/src/arrow/adapters/orc/adapter.h
+++ b/cpp/src/arrow/adapters/orc/adapter.h
@@ -41,7 +41,7 @@ class ARROW_EXPORT ORCFileReader {
  public:
   ~ORCFileReader();
 
-  /// \brief Create a new ORC reader
+  /// \brief Creates a new ORC reader.
   ///
   /// \param[in] file the data source
   /// \param[in] pool a MemoryPool to use for buffer allocations
@@ -102,6 +102,35 @@ class ARROW_EXPORT ORCFileReader {
   Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
                     std::shared_ptr<RecordBatch>* out);
 
+  /// \brief Seek to designated row. Invoke NextStripeReader() after seek
+  ///        will return stripe reader starting from designated row.
+  ///
+  /// \param[in] row_number the rows number to seek
+  Status Seek(int64_t row_number);
+
+  /// \brief Get a stripe level record batch iterator with specified row count
+  ///         in each record batch. NextStripeReader serves as an fine grain
+  ///         alternative to ReadStripe which may cause OOM issue by loading
+  ///         the whole stripes into memory.
+  ///
+  /// \param[in] batch_size the number of rows each record batch contains in
+  ///            record batch iteration.
+  /// \param[out] out the returned stripe reader
+  Status NextStripeReader(int64_t batch_size, std::shared_ptr<RecordBatchReader>* out);
+
+  /// \brief Get a stripe level record batch iterator with specified row count
+  ///         in each record batch. NextStripeReader serves as an fine grain
+  ///         alternative to ReadStripe which may cause OOM issue by loading
+  ///         the whole stripes into memory.
+  ///
+  /// \param[in] batch_size Get a stripe level record batch iterator with specified row
+  /// count in each record batch.
+  ///
+  /// \param[in] include_indices the selected field indices to read
+  /// \param[out] out the returned stripe reader
+  Status NextStripeReader(int64_t batch_size, const std::vector<int>& include_indices,
+                          std::shared_ptr<RecordBatchReader>* out);
+
   /// \brief The number of stripes in the file
   int64_t NumberOfStripes();
 
diff --git a/cpp/src/arrow/adapters/orc/adapter_util.cc b/cpp/src/arrow/adapters/orc/adapter_util.cc
new file mode 100644
index 0000000..235e5ba
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter_util.cc
@@ -0,0 +1,427 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <string>
+#include <vector>
+
+#include "arrow/adapters/orc/adapter_util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/builder.h"
+#include "arrow/util/checked_cast.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/lazy.h"
+
+#include "orc/Exceptions.hh"
+#include "orc/OrcFile.hh"
+
+// alias to not interfere with nested orc namespace
+namespace liborc = orc;
+
+namespace arrow {
+
+namespace adapters {
+
+namespace orc {
+
+using internal::checked_cast;
+
+// The numer of nanoseconds in a second
+constexpr int64_t kOneSecondNanos = 1000000000LL;
+
+Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+                         int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<StructBuilder*>(abuilder);
+  auto batch = checked_cast<liborc::StructVectorBatch*>(cbatch);
+
+  const uint8_t* valid_bytes = nullptr;
+  if (batch->hasNulls) {
+    valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+  }
+  RETURN_NOT_OK(builder->AppendValues(length, valid_bytes));
+
+  for (int i = 0; i < builder->num_fields(); i++) {
+    RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length,
+                              builder->field_builder(i)));
+  }
+  return Status::OK();
+}
+
+Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+                       int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<ListBuilder*>(abuilder);
+  auto batch = checked_cast<liborc::ListVectorBatch*>(cbatch);
+  liborc::ColumnVectorBatch* elements = batch->elements.get();
+  const liborc::Type* elemtype = type->getSubtype(0);
+
+  const bool has_nulls = batch->hasNulls;
+  for (int64_t i = offset; i < length + offset; i++) {
+    if (!has_nulls || batch->notNull[i]) {
+      int64_t start = batch->offsets[i];
+      int64_t end = batch->offsets[i + 1];
+      RETURN_NOT_OK(builder->Append());
+      RETURN_NOT_OK(
+          AppendBatch(elemtype, elements, start, end - start, builder->value_builder()));
+    } else {
+      RETURN_NOT_OK(builder->AppendNull());
+    }
+  }
+  return Status::OK();
+}
+
+Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+                      int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+  auto list_builder = checked_cast<ListBuilder*>(abuilder);
+  auto struct_builder = checked_cast<StructBuilder*>(list_builder->value_builder());
+  auto batch = checked_cast<liborc::MapVectorBatch*>(cbatch);
+  liborc::ColumnVectorBatch* keys = batch->keys.get();
+  liborc::ColumnVectorBatch* vals = batch->elements.get();
+  const liborc::Type* keytype = type->getSubtype(0);
+  const liborc::Type* valtype = type->getSubtype(1);
+
+  const bool has_nulls = batch->hasNulls;
+  for (int64_t i = offset; i < length + offset; i++) {
+    RETURN_NOT_OK(list_builder->Append());
+    int64_t start = batch->offsets[i];
+    int64_t list_length = batch->offsets[i + 1] - start;
+    if (list_length && (!has_nulls || batch->notNull[i])) {
+      RETURN_NOT_OK(struct_builder->AppendValues(list_length, nullptr));
+      RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length,
+                                struct_builder->field_builder(0)));
+      RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length,
+                                struct_builder->field_builder(1)));
+    }
+  }
+  return Status::OK();
+}
+
+template <class builder_type, class batch_type, class elem_type>
+Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+                          int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<builder_type*>(abuilder);
+  auto batch = checked_cast<batch_type*>(cbatch);
+
+  if (length == 0) {
+    return Status::OK();
+  }
+  const uint8_t* valid_bytes = nullptr;
+  if (batch->hasNulls) {
+    valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+  }
+  const elem_type* source = batch->data.data() + offset;
+  RETURN_NOT_OK(builder->AppendValues(source, length, valid_bytes));
+  return Status::OK();
+}
+
+template <class builder_type, class target_type, class batch_type, class source_type>
+Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+                              int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<builder_type*>(abuilder);
+  auto batch = checked_cast<batch_type*>(cbatch);
+
+  if (length == 0) {
+    return Status::OK();
+  }
+
+  const uint8_t* valid_bytes = nullptr;
+  if (batch->hasNulls) {
+    valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+  }
+  const source_type* source = batch->data.data() + offset;
+  auto cast_iter = internal::MakeLazyRange(
+      [&source](int64_t index) { return static_cast<target_type>(source[index]); },
+      length);
+
+  RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
+
+  return Status::OK();
+}
+
+Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset, int64_t length,
+                       ArrayBuilder* abuilder) {
+  auto builder = checked_cast<BooleanBuilder*>(abuilder);
+  auto batch = checked_cast<liborc::LongVectorBatch*>(cbatch);
+
+  if (length == 0) {
+    return Status::OK();
+  }
+
+  const uint8_t* valid_bytes = nullptr;
+  if (batch->hasNulls) {
+    valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+  }
+  const int64_t* source = batch->data.data() + offset;
+
+  auto cast_iter = internal::MakeLazyRange(
+      [&source](int64_t index) { return static_cast<bool>(source[index]); }, length);
+
+  RETURN_NOT_OK(builder->AppendValues(cast_iter.begin(), cast_iter.end(), valid_bytes));
+
+  return Status::OK();
+}
+
+Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+                            int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<TimestampBuilder*>(abuilder);
+  auto batch = checked_cast<liborc::TimestampVectorBatch*>(cbatch);
+
+  if (length == 0) {
+    return Status::OK();
+  }
+
+  const uint8_t* valid_bytes = nullptr;
+  if (batch->hasNulls) {
+    valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+  }
+
+  const int64_t* seconds = batch->data.data() + offset;
+  const int64_t* nanos = batch->nanoseconds.data() + offset;
+
+  auto transform_timestamp = [seconds, nanos](int64_t index) {
+    return seconds[index] * kOneSecondNanos + nanos[index];
+  };
+
+  auto transform_range = internal::MakeLazyRange(transform_timestamp, length);
+
+  RETURN_NOT_OK(
+      builder->AppendValues(transform_range.begin(), transform_range.end(), valid_bytes));
+  return Status::OK();
+}
+
+template <class builder_type>
+Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+                         int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<builder_type*>(abuilder);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+
+  const bool has_nulls = batch->hasNulls;
+  for (int64_t i = offset; i < length + offset; i++) {
+    if (!has_nulls || batch->notNull[i]) {
+      RETURN_NOT_OK(
+          builder->Append(batch->data[i], static_cast<int32_t>(batch->length[i])));
+    } else {
+      RETURN_NOT_OK(builder->AppendNull());
+    }
+  }
+  return Status::OK();
+}
+
+Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+                              int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<FixedSizeBinaryBuilder*>(abuilder);
+  auto batch = checked_cast<liborc::StringVectorBatch*>(cbatch);
+
+  const bool has_nulls = batch->hasNulls;
+  for (int64_t i = offset; i < length + offset; i++) {
+    if (!has_nulls || batch->notNull[i]) {
+      RETURN_NOT_OK(builder->Append(batch->data[i]));
+    } else {
+      RETURN_NOT_OK(builder->AppendNull());
+    }
+  }
+  return Status::OK();
+}
+
+Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+                          int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+  auto builder = checked_cast<Decimal128Builder*>(abuilder);
+
+  const bool has_nulls = cbatch->hasNulls;
+  if (type->getPrecision() == 0 || type->getPrecision() > 18) {
+    auto batch = checked_cast<liborc::Decimal128VectorBatch*>(cbatch);
+    for (int64_t i = offset; i < length + offset; i++) {
+      if (!has_nulls || batch->notNull[i]) {
+        RETURN_NOT_OK(builder->Append(
+            Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits())));
+      } else {
+        RETURN_NOT_OK(builder->AppendNull());
+      }
+    }
+  } else {
+    auto batch = checked_cast<liborc::Decimal64VectorBatch*>(cbatch);
+    for (int64_t i = offset; i < length + offset; i++) {
+      if (!has_nulls || batch->notNull[i]) {
+        RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i])));
+      } else {
+        RETURN_NOT_OK(builder->AppendNull());
+      }
+    }
+  }
+  return Status::OK();
+}
+
+Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
+                   int64_t offset, int64_t length, ArrayBuilder* builder) {
+  if (type == nullptr) {
+    return Status::OK();
+  }
+  liborc::TypeKind kind = type->getKind();
+  switch (kind) {
+    case liborc::STRUCT:
+      return AppendStructBatch(type, batch, offset, length, builder);
+    case liborc::LIST:
+      return AppendListBatch(type, batch, offset, length, builder);
+    case liborc::MAP:
+      return AppendMapBatch(type, batch, offset, length, builder);
+    case liborc::LONG:
+      return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>(
+          batch, offset, length, builder);
+    case liborc::INT:
+      return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch,
+                                    int64_t>(batch, offset, length, builder);
+    case liborc::SHORT:
+      return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch,
+                                    int64_t>(batch, offset, length, builder);
+    case liborc::BYTE:
+      return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch,
+                                    int64_t>(batch, offset, length, builder);
+    case liborc::DOUBLE:
+      return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>(
+          batch, offset, length, builder);
+    case liborc::FLOAT:
+      return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch,
+                                    double>(batch, offset, length, builder);
+    case liborc::BOOLEAN:
+      return AppendBoolBatch(batch, offset, length, builder);
+    case liborc::VARCHAR:
+    case liborc::STRING:
+      return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder);
+    case liborc::BINARY:
+      return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder);
+    case liborc::CHAR:
+      return AppendFixedBinaryBatch(batch, offset, length, builder);
+    case liborc::DATE:
+      return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch,
+                                    int64_t>(batch, offset, length, builder);
+    case liborc::TIMESTAMP:
+      return AppendTimestampBatch(batch, offset, length, builder);
+    case liborc::DECIMAL:
+      return AppendDecimalBatch(type, batch, offset, length, builder);
+    default:
+      return Status::NotImplemented("Not implemented type kind: ", kind);
+  }
+}
+
+Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) {
+  // When subselecting fields on read, liborc will set some nodes to nullptr,
+  // so we need to check for nullptr before progressing
+  if (type == nullptr) {
+    *out = null();
+    return Status::OK();
+  }
+  liborc::TypeKind kind = type->getKind();
+  const int subtype_count = static_cast<int>(type->getSubtypeCount());
+
+  switch (kind) {
+    case liborc::BOOLEAN:
+      *out = boolean();
+      break;
+    case liborc::BYTE:
+      *out = int8();
+      break;
+    case liborc::SHORT:
+      *out = int16();
+      break;
+    case liborc::INT:
+      *out = int32();
+      break;
+    case liborc::LONG:
+      *out = int64();
+      break;
+    case liborc::FLOAT:
+      *out = float32();
+      break;
+    case liborc::DOUBLE:
+      *out = float64();
+      break;
+    case liborc::VARCHAR:
+    case liborc::STRING:
+      *out = utf8();
+      break;
+    case liborc::BINARY:
+      *out = binary();
+      break;
+    case liborc::CHAR:
+      *out = fixed_size_binary(static_cast<int>(type->getMaximumLength()));
+      break;
+    case liborc::TIMESTAMP:
+      *out = timestamp(TimeUnit::NANO);
+      break;
+    case liborc::DATE:
+      *out = date32();
+      break;
+    case liborc::DECIMAL: {
+      const int precision = static_cast<int>(type->getPrecision());
+      const int scale = static_cast<int>(type->getScale());
+      if (precision == 0) {
+        // In HIVE 0.11/0.12 precision is set as 0, but means max precision
+        *out = decimal(38, 6);
+      } else {
+        *out = decimal(precision, scale);
+      }
+      break;
+    }
+    case liborc::LIST: {
+      if (subtype_count != 1) {
+        return Status::Invalid("Invalid Orc List type");
+      }
+      std::shared_ptr<DataType> elemtype;
+      RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype));
+      *out = list(elemtype);
+      break;
+    }
+    case liborc::MAP: {
+      if (subtype_count != 2) {
+        return Status::Invalid("Invalid Orc Map type");
+      }
+      std::shared_ptr<DataType> keytype;
+      std::shared_ptr<DataType> valtype;
+      RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype));
+      RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype));
+      *out = list(struct_({field("key", keytype), field("value", valtype)}));
+      break;
+    }
+    case liborc::STRUCT: {
+      std::vector<std::shared_ptr<Field>> fields;
+      for (int child = 0; child < subtype_count; ++child) {
+        std::shared_ptr<DataType> elemtype;
+        RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
+        std::string name = type->getFieldName(child);
+        fields.push_back(field(name, elemtype));
+      }
+      *out = struct_(fields);
+      break;
+    }
+    case liborc::UNION: {
+      std::vector<std::shared_ptr<Field>> fields;
+      std::vector<uint8_t> type_codes;
+      for (int child = 0; child < subtype_count; ++child) {
+        std::shared_ptr<DataType> elemtype;
+        RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
+        fields.push_back(field("_union_" + std::to_string(child), elemtype));
+        type_codes.push_back(static_cast<uint8_t>(child));
+      }
+      *out = union_(fields, type_codes);
+      break;
+    }
+    default: { return Status::Invalid("Unknown Orc type kind: ", kind); }
+  }
+  return Status::OK();
+}
+
+}  // namespace orc
+}  // namespace adapters
+}  // namespace arrow
diff --git a/cpp/src/arrow/adapters/orc/adapter_util.h b/cpp/src/arrow/adapters/orc/adapter_util.h
new file mode 100644
index 0000000..eede230
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter_util.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_ADAPATER_UTIL_H
+#define ARROW_ADAPATER_UTIL_H
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/array/builder_base.h"
+#include "arrow/status.h"
+#include "orc/OrcFile.hh"
+
+namespace liborc = orc;
+
+namespace arrow {
+
+namespace adapters {
+
+namespace orc {
+
+Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out);
+
+Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
+                   int64_t offset, int64_t length, ArrayBuilder* builder);
+}  // namespace orc
+}  // namespace adapters
+}  // namespace arrow
+
+#endif  // ARROW_ADAPATER_UTIL_H