You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/01/04 15:11:17 UTC
[arrow] branch master updated: ARROW-1920 [C++/Python] Add
experimental reader for Apache ORC files
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 3a69efc ARROW-1920 [C++/Python] Add experimental reader for Apache ORC files
3a69efc is described below
commit 3a69efce59f807c60b8d218abf533e4f1b45ed62
Author: Jim Crist <ji...@gmail.com>
AuthorDate: Thu Jan 4 10:11:13 2018 -0500
ARROW-1920 [C++/Python] Add experimental reader for Apache ORC files
This adds support for reading ORC files in the C++ library, as well as python bindings for this functionality.
Author: Jim Crist <ji...@gmail.com>
Author: Uwe L. Korn <uw...@xhochy.com>
Closes #1418 from jcrist/orc-adapter and squashes the following commits:
7e0400eb [Jim Crist] lint
d6d32b50 [Uwe L. Korn] Hide symbols introduced by orc static lib
a2966406 [Jim Crist] Tweak error message
f45ac3dd [Jim Crist] Read reads as a table
57bc63dd [Jim Crist] Use `vector<int>` instead of `list<uint64_t>`
1d539276 [Jim Crist] date32 instead of date64
4b7a3a5d [Jim Crist] Add brief docs
e7835445 [Jim Crist] More fixups
33f5b10f [Jim Crist] Turn off ARROW_ORC on windows
86a23550 [Jim Crist] Cleanups
2cfdd924 [Jim Crist] Fix build when dependencies aren't already installed
876c3a36 [Jim Crist] Use fPIC on protobuf as well
f4a29f87 [Jim Crist] Ensure -fPIC on orc build
7cf1659e [Jim Crist] Build python orc support on travis
2adf938a [Jim Crist] Add ORC support
5c791046 [Jim Crist] Add cmake support for liborc
---
ci/travis_script_python.sh | 2 +-
cpp/CMakeLists.txt | 22 +
cpp/cmake_modules/FindProtobuf.cmake | 89 +++
cpp/cmake_modules/ThirdpartyToolchain.cmake | 73 +++
cpp/src/arrow/CMakeLists.txt | 5 +
.../src/arrow/adapters/orc/CMakeLists.txt | 15 +-
cpp/src/arrow/adapters/orc/adapter.cc | 697 +++++++++++++++++++++
cpp/src/arrow/adapters/orc/adapter.h | 105 ++++
cpp/src/arrow/symbols.map | 4 +
python/CMakeLists.txt | 11 +
.../manylinux1/scripts/check_arrow_visibility.sh | 5 +-
python/pyarrow/_orc.pxd | 50 ++
python/pyarrow/_orc.pyx | 111 ++++
python/pyarrow/orc.py | 149 +++++
python/setup.py | 9 +
15 files changed, 1337 insertions(+), 10 deletions(-)
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index 5f7b0a9..444386f 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -82,7 +82,7 @@ fi
export PYARROW_BUILD_TYPE=$ARROW_BUILD_TYPE
pip install -r requirements.txt
-python setup.py build_ext --with-parquet --with-plasma \
+python setup.py build_ext --with-parquet --with-plasma --with-orc\
install --single-version-externally-managed --record=record.text
popd
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index dd159ce..ede13af 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -119,6 +119,10 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
"Build the Arrow GPU extensions (requires CUDA installation)"
OFF)
+ option(ARROW_ORC
+ "Build the Arrow ORC adapter"
+ ON)
+
option(ARROW_JEMALLOC
"Build the Arrow jemalloc-based allocator"
OFF)
@@ -226,6 +230,17 @@ if(ARROW_BUILD_TESTS OR ARROW_BUILD_BENCHMARKS)
set(ARROW_WITH_ZSTD ON)
endif()
+if (MSVC)
+ # ORC doesn't build on windows
+ set(ARROW_ORC OFF)
+endif()
+
+if(ARROW_ORC)
+ set(ARROW_WITH_LZ4 ON)
+ set(ARROW_WITH_SNAPPY ON)
+ set(ARROW_WITH_ZLIB ON)
+endif()
+
if(NOT ARROW_BUILD_TESTS)
set(NO_TESTS 1)
endif()
@@ -526,6 +541,13 @@ if (ARROW_WITH_GRPC)
${ARROW_STATIC_LINK_LIBS})
endif()
+if (ARROW_ORC)
+ SET(ARROW_STATIC_LINK_LIBS
+ orc
+ protobuf
+ ${ARROW_STATIC_LINK_LIBS})
+endif()
+
if (ARROW_STATIC_LINK_LIBS)
add_dependencies(arrow_dependencies ${ARROW_STATIC_LINK_LIBS})
endif()
diff --git a/cpp/cmake_modules/FindProtobuf.cmake b/cpp/cmake_modules/FindProtobuf.cmake
new file mode 100644
index 0000000..a42f449
--- /dev/null
+++ b/cpp/cmake_modules/FindProtobuf.cmake
@@ -0,0 +1,89 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# PROTOBUF_HOME environmental variable is used to check for Protobuf headers and static library
+
+# PROTOBUF_INCLUDE_DIR: directory containing headers
+# PROTOBUF_LIBS: directory containing Protobuf libraries
+# PROTOBUF_STATIC_LIB: location of protobuf.a
+# PROTOC_STATIC_LIB: location of protoc.a
+# PROTOBUF_EXECUTABLE: location of protoc
+# PROTOBUF_FOUND is set if Protobuf is found
+
+
+if( NOT "${PROTOBUF_HOME}" STREQUAL "")
+ file (TO_CMAKE_PATH "${PROTOBUF_HOME}" _protobuf_path)
+endif()
+
+message (STATUS "PROTOBUF_HOME: ${PROTOBUF_HOME}")
+
+find_path (PROTOBUF_INCLUDE_DIR google/protobuf/io/zero_copy_stream.h HINTS
+ ${_protobuf_path}
+ NO_DEFAULT_PATH
+ PATH_SUFFIXES "include")
+
+find_path (PROTOBUF_INCLUDE_DIR google/protobuf/io/coded_stream.h HINTS
+ ${_protobuf_path}
+ NO_DEFAULT_PATH
+ PATH_SUFFIXES "include")
+
+find_library (PROTOBUF_LIBRARY NAMES protobuf PATHS
+ ${_protobuf_path}
+ NO_DEFAULT_PATH
+ PATH_SUFFIXES "lib")
+
+find_library (PROTOC_LIBRARY NAMES protoc PATHS
+ ${_protobuf_path}
+ NO_DEFAULT_PATH
+ PATH_SUFFIXES "lib")
+
+find_program(PROTOBUF_EXECUTABLE protoc HINTS
+ ${_protobuf_path}
+ NO_DEFAULT_PATH
+ PATH_SUFFIXES "bin")
+
+if (PROTOBUF_INCLUDE_DIR AND PROTOBUF_LIBRARY AND PROTOC_LIBRARY AND PROTOBUF_EXECUTABLE)
+ set (PROTOBUF_FOUND TRUE)
+ get_filename_component (PROTOBUF_LIBS ${PROTOBUF_LIBRARY} PATH)
+ set (PROTOBUF_LIB_NAME protobuf)
+ set (PROTOC_LIB_NAME protoc)
+ set (PROTOBUF_STATIC_LIB ${PROTOBUF_LIBS}/${CMAKE_STATIC_LIBRARY_PREFIX}${PROTOBUF_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX})
+ set (PROTOC_STATIC_LIB ${PROTOBUF_LIBS}/${CMAKE_STATIC_LIBRARY_PREFIX}${PROTOC_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX})
+else ()
+ set (PROTOBUF_FOUND FALSE)
+endif ()
+
+if (PROTOBUF_FOUND)
+ message (STATUS "Found the Protobuf headers: ${PROTOBUF_INCLUDE_DIR}")
+ message (STATUS "Found the Protobuf library: ${PROTOBUF_STATIC_LIB}")
+ message (STATUS "Found the Protoc library: ${PROTOC_STATIC_LIB}")
+ message (STATUS "Found the Protoc executable: ${PROTOBUF_EXECUTABLE}")
+else()
+ if (_protobuf_path)
+ set (PROTOBUF_ERR_MSG "Could not find Protobuf. Looked in ${_protobuf_path}.")
+ else ()
+ set (PROTOBUF_ERR_MSG "Could not find Protobuf in system search paths.")
+ endif()
+
+ if (Protobuf_FIND_REQUIRED)
+ message (FATAL_ERROR "${PROTOBUF_ERR_MSG}")
+ else ()
+ message (STATUS "${PROTOBUF_ERR_MSG}")
+ endif ()
+endif()
+
+mark_as_advanced (
+ PROTOBUF_INCLUDE_DIR
+ PROTOBUF_LIBS
+ PROTOBUF_STATIC_LIB
+ PROTOC_STATIC_LIB
+)
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index b706aab..4f64434 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -29,7 +29,9 @@ set(SNAPPY_VERSION "1.1.3")
set(BROTLI_VERSION "v0.6.0")
set(LZ4_VERSION "1.7.5")
set(ZSTD_VERSION "1.2.0")
+set(PROTOBUF_VERSION "2.6.0")
set(GRPC_VERSION "94582910ad7f82ad447ecc72e6548cb669e4f7a9") # v1.6.5
+set(ORC_VERSION "cf00b67795717ab3eb04e950780ed6d104109017")
string(TOUPPER ${CMAKE_BUILD_TYPE} UPPERCASE_BUILD_TYPE)
@@ -721,6 +723,7 @@ if (ARROW_WITH_LZ4)
if("${LZ4_HOME}" STREQUAL "")
set(LZ4_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/lz4_ep-prefix/src/lz4_ep")
+ set(LZ4_HOME "${LZ4_BUILD_DIR}")
set(LZ4_INCLUDE_DIR "${LZ4_BUILD_DIR}/lib")
if (MSVC)
@@ -865,3 +868,73 @@ if (ARROW_WITH_GRPC)
endif()
endif()
+
+if (ARROW_ORC)
+ # protobuf
+ if ("${PROTOBUF_HOME}" STREQUAL "")
+ set (PROTOBUF_PREFIX "${THIRDPARTY_DIR}/protobuf_ep-install")
+ set (PROTOBUF_HOME "${PROTOBUF_PREFIX}")
+ set (PROTOBUF_INCLUDE_DIR "${PROTOBUF_PREFIX}/include")
+ set (PROTOBUF_STATIC_LIB "${PROTOBUF_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}protobuf${CMAKE_STATIC_LIBRARY_SUFFIX}")
+ set (PROTOBUF_SRC_URL "https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/protobuf-${PROTOBUF_VERSION}.tar.gz")
+
+ ExternalProject_Add(protobuf_ep
+ CONFIGURE_COMMAND "./configure" "--disable-shared" "--prefix=${PROTOBUF_PREFIX}" "CXXFLAGS=${EP_CXX_FLAGS}"
+ BUILD_IN_SOURCE 1
+ URL ${PROTOBUF_SRC_URL}
+ LOG_DOWNLOAD 1
+ LOG_CONFIGURE 1
+ LOG_BUILD 1
+ LOG_INSTALL 1
+ BUILD_BYPRODUCTS "${PROTOBUF_STATIC_LIB}")
+
+ set (PROTOBUF_VENDORED 1)
+ else ()
+ find_package (Protobuf REQUIRED)
+ set (PROTOBUF_VENDORED 0)
+ endif ()
+
+ include_directories (SYSTEM ${PROTOBUF_INCLUDE_DIR})
+ ADD_THIRDPARTY_LIB(protobuf
+ STATIC_LIB ${PROTOBUF_STATIC_LIB})
+
+ if (PROTOBUF_VENDORED)
+ add_dependencies (protobuf protobuf_ep)
+ endif ()
+
+ # orc
+ set(ORC_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/orc_ep-install")
+ set(ORC_HOME "${ORC_PREFIX}")
+ set(ORC_INCLUDE_DIR "${ORC_PREFIX}/include")
+ set(ORC_STATIC_LIB "${ORC_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}orc${CMAKE_STATIC_LIBRARY_SUFFIX}")
+
+ # Since LZ4 isn't installed, the header file is in ${LZ4_HOME}/lib instead of
+ # ${LZ4_HOME}/include, which forces us to specify the include directory
+ # manually as well.
+ set (ORC_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
+ -DCMAKE_INSTALL_PREFIX=${ORC_PREFIX}
+ -DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}
+ -DBUILD_LIBHDFSPP=OFF
+ -DBUILD_JAVA=OFF
+ -DBUILD_TOOLS=OFF
+ -DBUILD_CPP_TESTS=OFF
+ -DINSTALL_VENDORED_LIBS=OFF
+ -DPROTOBUF_HOME=${PROTOBUF_HOME}
+ -DLZ4_HOME=${LZ4_HOME}
+ -DLZ4_INCLUDE_DIR=${LZ4_INCLUDE_DIR}
+ -DSNAPPY_HOME=${SNAPPY_HOME}
+ -DZLIB_HOME=${ZLIB_HOME})
+
+ ExternalProject_Add(orc_ep
+ GIT_REPOSITORY "https://github.com/apache/orc"
+ GIT_TAG ${ORC_VERSION}
+ BUILD_BYPRODUCTS ${ORC_STATIC_LIB}
+ CMAKE_ARGS ${ORC_CMAKE_ARGS})
+
+ include_directories(SYSTEM ${ORC_INCLUDE_DIR})
+ ADD_THIRDPARTY_LIB(orc
+ STATIC_LIB ${ORC_STATIC_LIB})
+
+ add_dependencies(orc_ep protobuf lz4_static snappy zlib)
+ add_dependencies(orc orc_ep)
+endif()
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index d645cca..ad86256 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -90,6 +90,11 @@ if (ARROW_WITH_ZSTD)
SET(ARROW_SRCS util/compression_zstd.cc ${ARROW_SRCS})
endif()
+if (ARROW_ORC)
+ add_subdirectory(adapters/orc)
+ SET(ARROW_SRCS adapters/orc/adapter.cc ${ARROW_SRCS})
+endif()
+
if (NOT ARROW_BOOST_HEADER_ONLY)
set(ARROW_SRCS ${ARROW_SRCS}
io/hdfs.cc
diff --git a/python/manylinux1/scripts/check_arrow_visibility.sh b/cpp/src/arrow/adapters/orc/CMakeLists.txt
old mode 100755
new mode 100644
similarity index 78%
copy from python/manylinux1/scripts/check_arrow_visibility.sh
copy to cpp/src/arrow/adapters/orc/CMakeLists.txt
index 27a30f7..eb7194c
--- a/python/manylinux1/scripts/check_arrow_visibility.sh
+++ b/cpp/src/arrow/adapters/orc/CMakeLists.txt
@@ -1,4 +1,3 @@
-#!/bin/bash -ex
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -16,11 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-nm -D -C /arrow-dist/lib64/libarrow.so > nm_arrow.log
+#######################################
+# arrow_orc
+#######################################
-if [[ `grep ' T ' nm_arrow.log | grep -v arrow | wc -l` -eq 2 ]]
-then
- exit 0
-fi
-
-exit 1
+# Headers: top level
+install(FILES
+ adapter.h
+ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/adapters/orc")
diff --git a/cpp/src/arrow/adapters/orc/adapter.cc b/cpp/src/arrow/adapters/orc/adapter.cc
new file mode 100644
index 0000000..473c90f
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter.cc
@@ -0,0 +1,697 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/adapters/orc/adapter.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/builder.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/table_builder.h"
+#include "arrow/type.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/decimal.h"
+#include "arrow/util/macros.h"
+#include "arrow/util/visibility.h"
+
+#include "orc/OrcFile.hh"
+
+// alias to not interfere with nested orc namespace
+namespace liborc = orc;
+
+namespace arrow {
+namespace adapters {
+namespace orc {
+
+#define ORC_THROW_NOT_OK(s) \
+ do { \
+ Status _s = (s); \
+ if (!_s.ok()) { \
+ std::stringstream ss; \
+ ss << "Arrow error: " << _s.ToString(); \
+ throw liborc::ParseError(ss.str()); \
+ } \
+ } while (0)
+
+class ArrowInputFile : public liborc::InputStream {
+ public:
+ explicit ArrowInputFile(const std::shared_ptr<io::ReadableFileInterface>& file)
+ : file_(file) {}
+
+ uint64_t getLength() const override {
+ int64_t size;
+ ORC_THROW_NOT_OK(file_->GetSize(&size));
+ return static_cast<uint64_t>(size);
+ }
+
+ uint64_t getNaturalReadSize() const override { return 128 * 1024; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override {
+ int64_t bytes_read;
+
+ ORC_THROW_NOT_OK(file_->ReadAt(offset, length, &bytes_read, buf));
+
+ if (static_cast<uint64_t>(bytes_read) != length) {
+ throw liborc::ParseError("Short read from arrow input file");
+ }
+ }
+
+ const std::string& getName() const override {
+ static const std::string filename("ArrowInputFile");
+ return filename;
+ }
+
+ private:
+ std::shared_ptr<io::ReadableFileInterface> file_;
+};
+
+struct StripeInformation {
+ uint64_t offset;
+ uint64_t length;
+ uint64_t num_rows;
+};
+
+Status GetArrowType(const liborc::Type* type, std::shared_ptr<DataType>* out) {
+ // When subselecting fields on read, liborc will set some nodes to nullptr,
+ // so we need to check for nullptr before progressing
+ if (type == nullptr) {
+ *out = null();
+ return Status::OK();
+ }
+ liborc::TypeKind kind = type->getKind();
+ switch (kind) {
+ case liborc::BOOLEAN:
+ *out = boolean();
+ break;
+ case liborc::BYTE:
+ *out = int8();
+ break;
+ case liborc::SHORT:
+ *out = int16();
+ break;
+ case liborc::INT:
+ *out = int32();
+ break;
+ case liborc::LONG:
+ *out = int64();
+ break;
+ case liborc::FLOAT:
+ *out = float32();
+ break;
+ case liborc::DOUBLE:
+ *out = float64();
+ break;
+ case liborc::VARCHAR:
+ case liborc::STRING:
+ *out = utf8();
+ break;
+ case liborc::BINARY:
+ *out = binary();
+ break;
+ case liborc::CHAR:
+ *out = fixed_size_binary(type->getMaximumLength());
+ break;
+ case liborc::TIMESTAMP:
+ *out = timestamp(TimeUnit::NANO);
+ break;
+ case liborc::DATE:
+ *out = date32();
+ break;
+ case liborc::DECIMAL: {
+ if (type->getPrecision() == 0) {
+ // In HIVE 0.11/0.12 precision is set as 0, but means max precision
+ *out = decimal(38, 6);
+ } else {
+ *out = decimal(type->getPrecision(), type->getScale());
+ }
+ break;
+ }
+ case liborc::LIST: {
+ if (type->getSubtypeCount() != 1) {
+ return Status::Invalid("Invalid Orc List type");
+ }
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &elemtype));
+ *out = list(elemtype);
+ break;
+ }
+ case liborc::MAP: {
+ if (type->getSubtypeCount() != 2) {
+ return Status::Invalid("Invalid Orc Map type");
+ }
+ std::shared_ptr<DataType> keytype;
+ std::shared_ptr<DataType> valtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(0), &keytype));
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(1), &valtype));
+ *out = list(struct_({field("key", keytype), field("value", valtype)}));
+ break;
+ }
+ case liborc::STRUCT: {
+ int size = type->getSubtypeCount();
+ std::vector<std::shared_ptr<Field>> fields;
+ for (int child = 0; child < size; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
+ std::string name = type->getFieldName(child);
+ fields.push_back(field(name, elemtype));
+ }
+ *out = struct_(fields);
+ break;
+ }
+ case liborc::UNION: {
+ int size = type->getSubtypeCount();
+ std::vector<std::shared_ptr<Field>> fields;
+ std::vector<uint8_t> type_codes;
+ for (int child = 0; child < size; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type->getSubtype(child), &elemtype));
+ fields.push_back(field("_union_" + std::to_string(child), elemtype));
+ type_codes.push_back(static_cast<uint8_t>(child));
+ }
+ *out = union_(fields, type_codes);
+ break;
+ }
+ default: {
+ std::stringstream ss;
+ ss << "Unknown Orc type kind: " << kind;
+ return Status::Invalid(ss.str());
+ }
+ }
+ return Status::OK();
+}
+
+// The number of rows to read in a ColumnVectorBatch
+constexpr int64_t kReadRowsBatch = 1000;
+
+// The numer of nanoseconds in a second
+constexpr int64_t kOneSecondNanos = 1000000000LL;
+
+class ORCFileReader::Impl {
+ public:
+ Impl() {}
+ ~Impl() {}
+
+ Status Open(const std::shared_ptr<io::ReadableFileInterface>& file, MemoryPool* pool) {
+ std::unique_ptr<ArrowInputFile> io_wrapper(new ArrowInputFile(file));
+ liborc::ReaderOptions options;
+ std::unique_ptr<liborc::Reader> liborc_reader;
+ try {
+ liborc_reader = createReader(std::move(io_wrapper), options);
+ } catch (const liborc::ParseError& e) {
+ return Status::IOError(e.what());
+ }
+ pool_ = pool;
+ reader_ = std::move(liborc_reader);
+
+ return Init();
+ }
+
+ Status Init() {
+ int64_t nstripes = reader_->getNumberOfStripes();
+ stripes_.resize(nstripes);
+ std::unique_ptr<liborc::StripeInformation> stripe;
+ for (int i = 0; i < nstripes; ++i) {
+ stripe = reader_->getStripe(i);
+ stripes_[i] = StripeInformation(
+ {stripe->getOffset(), stripe->getLength(), stripe->getNumberOfRows()});
+ }
+ return Status::OK();
+ }
+
+ int64_t NumberOfStripes() { return stripes_.size(); }
+
+ int64_t NumberOfRows() { return reader_->getNumberOfRows(); }
+
+ Status ReadSchema(std::shared_ptr<Schema>* out) {
+ const liborc::Type& type = reader_->getType();
+ return GetArrowSchema(type, out);
+ }
+
+ Status GetArrowSchema(const liborc::Type& type, std::shared_ptr<Schema>* out) {
+ if (type.getKind() != liborc::STRUCT) {
+ return Status::NotImplemented(
+ "Only ORC files with a top-level struct "
+ "can be handled");
+ }
+ int size = type.getSubtypeCount();
+ std::vector<std::shared_ptr<Field>> fields;
+ for (int child = 0; child < size; ++child) {
+ std::shared_ptr<DataType> elemtype;
+ RETURN_NOT_OK(GetArrowType(type.getSubtype(child), &elemtype));
+ std::string name = type.getFieldName(child);
+ fields.push_back(field(name, elemtype));
+ }
+ std::list<std::string> keys = reader_->getMetadataKeys();
+ std::shared_ptr<KeyValueMetadata> metadata;
+ if (!keys.empty()) {
+ metadata = std::make_shared<KeyValueMetadata>();
+ for (auto it = keys.begin(); it != keys.end(); ++it) {
+ metadata->Append(*it, reader_->getMetadataValue(*it));
+ }
+ }
+
+ *out = std::make_shared<Schema>(fields, metadata);
+ return Status::OK();
+ }
+
+ Status Read(std::shared_ptr<Table>* out) {
+ liborc::RowReaderOptions opts;
+ return ReadTable(opts, out);
+ }
+
+ Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out) {
+ liborc::RowReaderOptions opts;
+ RETURN_NOT_OK(SelectIndices(&opts, include_indices));
+ return ReadTable(opts, out);
+ }
+
+ Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
+ liborc::RowReaderOptions opts;
+ RETURN_NOT_OK(SelectStripe(&opts, stripe));
+ return ReadBatch(opts, stripes_[stripe].num_rows, out);
+ }
+
+ Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
+ std::shared_ptr<RecordBatch>* out) {
+ liborc::RowReaderOptions opts;
+ RETURN_NOT_OK(SelectIndices(&opts, include_indices));
+ RETURN_NOT_OK(SelectStripe(&opts, stripe));
+ return ReadBatch(opts, stripes_[stripe].num_rows, out);
+ }
+
+ Status SelectStripe(liborc::RowReaderOptions* opts, int64_t stripe) {
+ if (stripe < 0 || stripe >= NumberOfStripes()) {
+ std::stringstream ss;
+ ss << "Out of bounds stripe: " << stripe;
+ return Status::Invalid(ss.str());
+ }
+ opts->range(stripes_[stripe].offset, stripes_[stripe].length);
+ return Status::OK();
+ }
+
+ Status SelectIndices(liborc::RowReaderOptions* opts,
+ const std::vector<int>& include_indices) {
+ std::list<uint64_t> include_indices_list;
+ for (auto it = include_indices.begin(); it != include_indices.end(); ++it) {
+ if (*it < 0) {
+ return Status::Invalid("Negative field index");
+ }
+ include_indices_list.push_back(*it);
+ }
+ opts->includeTypes(include_indices_list);
+ return Status::OK();
+ }
+
+ Status ReadTable(const liborc::RowReaderOptions& row_opts,
+ std::shared_ptr<Table>* out) {
+ liborc::RowReaderOptions opts(row_opts);
+ std::vector<std::shared_ptr<RecordBatch>> batches(stripes_.size());
+ for (size_t stripe = 0; stripe < stripes_.size(); stripe++) {
+ opts.range(stripes_[stripe].offset, stripes_[stripe].length);
+ RETURN_NOT_OK(ReadBatch(opts, stripes_[stripe].num_rows, &batches[stripe]));
+ }
+ return Table::FromRecordBatches(batches, out);
+ }
+
+ Status ReadBatch(const liborc::RowReaderOptions& opts, int64_t nrows,
+ std::shared_ptr<RecordBatch>* out) {
+ std::unique_ptr<liborc::RowReader> rowreader;
+ std::unique_ptr<liborc::ColumnVectorBatch> batch;
+ try {
+ rowreader = reader_->createRowReader(opts);
+ batch = rowreader->createRowBatch(std::min(nrows, kReadRowsBatch));
+ } catch (const liborc::ParseError& e) {
+ return Status::Invalid(e.what());
+ }
+ const liborc::Type& type = rowreader->getSelectedType();
+ std::shared_ptr<Schema> schema;
+ RETURN_NOT_OK(GetArrowSchema(type, &schema));
+
+ std::unique_ptr<RecordBatchBuilder> builder;
+ RETURN_NOT_OK(RecordBatchBuilder::Make(schema, pool_, nrows, &builder));
+
+ // The top-level type must be a struct to read into an arrow table
+ const auto& struct_batch = static_cast<liborc::StructVectorBatch&>(*batch);
+
+ while (rowreader->next(*batch)) {
+ for (int i = 0; i < builder->num_fields(); i++) {
+ RETURN_NOT_OK(AppendBatch(type.getSubtype(i), struct_batch.fields[i], 0,
+ batch->numElements, builder->GetField(i)));
+ }
+ }
+ RETURN_NOT_OK(builder->Flush(out));
+ return Status::OK();
+ }
+
+ Status AppendBatch(const liborc::Type* type, liborc::ColumnVectorBatch* batch,
+ int64_t offset, int64_t length, ArrayBuilder* builder) {
+ if (type == nullptr) {
+ return Status::OK();
+ }
+ liborc::TypeKind kind = type->getKind();
+ switch (kind) {
+ case liborc::STRUCT:
+ return AppendStructBatch(type, batch, offset, length, builder);
+ case liborc::LIST:
+ return AppendListBatch(type, batch, offset, length, builder);
+ case liborc::MAP:
+ return AppendMapBatch(type, batch, offset, length, builder);
+ case liborc::LONG:
+ return AppendNumericBatch<Int64Builder, liborc::LongVectorBatch, int64_t>(
+ batch, offset, length, builder);
+ case liborc::INT:
+ return AppendNumericBatchCast<Int32Builder, int32_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::SHORT:
+ return AppendNumericBatchCast<Int16Builder, int16_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::BYTE:
+ return AppendNumericBatchCast<Int8Builder, int8_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::DOUBLE:
+ return AppendNumericBatch<DoubleBuilder, liborc::DoubleVectorBatch, double>(
+ batch, offset, length, builder);
+ case liborc::FLOAT:
+ return AppendNumericBatchCast<FloatBuilder, float, liborc::DoubleVectorBatch,
+ double>(batch, offset, length, builder);
+ case liborc::BOOLEAN:
+ return AppendBoolBatch(batch, offset, length, builder);
+ case liborc::VARCHAR:
+ case liborc::STRING:
+ return AppendBinaryBatch<StringBuilder>(batch, offset, length, builder);
+ case liborc::BINARY:
+ return AppendBinaryBatch<BinaryBuilder>(batch, offset, length, builder);
+ case liborc::CHAR:
+ return AppendFixedBinaryBatch(batch, offset, length, builder);
+ case liborc::DATE:
+ return AppendNumericBatchCast<Date32Builder, int32_t, liborc::LongVectorBatch,
+ int64_t>(batch, offset, length, builder);
+ case liborc::TIMESTAMP:
+ return AppendTimestampBatch(batch, offset, length, builder);
+ case liborc::DECIMAL:
+ return AppendDecimalBatch(type, batch, offset, length, builder);
+ default:
+ std::stringstream ss;
+ ss << "Not implemented type kind: " << kind;
+ return Status::NotImplemented(ss.str());
+ }
+ }
+
+ Status AppendStructBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<StructBuilder*>(abuilder);
+ auto batch = static_cast<liborc::StructVectorBatch*>(cbatch);
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ RETURN_NOT_OK(builder->Append(length, valid_bytes));
+
+ for (int i = 0; i < builder->num_fields(); i++) {
+ RETURN_NOT_OK(AppendBatch(type->getSubtype(i), batch->fields[i], offset, length,
+ builder->field_builder(i)));
+ }
+ return Status::OK();
+ }
+
+ Status AppendListBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<ListBuilder*>(abuilder);
+ auto batch = static_cast<liborc::ListVectorBatch*>(cbatch);
+ liborc::ColumnVectorBatch* elements = batch->elements.get();
+ const liborc::Type* elemtype = type->getSubtype(0);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ int64_t start = batch->offsets[i];
+ int64_t end = batch->offsets[i + 1];
+ RETURN_NOT_OK(builder->Append());
+ RETURN_NOT_OK(AppendBatch(elemtype, elements, start, end - start,
+ builder->value_builder()));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ return Status::OK();
+ }
+
+ Status AppendMapBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto list_builder = static_cast<ListBuilder*>(abuilder);
+ auto struct_builder = static_cast<StructBuilder*>(list_builder->value_builder());
+ auto batch = static_cast<liborc::MapVectorBatch*>(cbatch);
+ liborc::ColumnVectorBatch* keys = batch->keys.get();
+ liborc::ColumnVectorBatch* vals = batch->elements.get();
+ const liborc::Type* keytype = type->getSubtype(0);
+ const liborc::Type* valtype = type->getSubtype(1);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int i = offset; i < length + offset; i++) {
+ RETURN_NOT_OK(list_builder->Append());
+ int64_t start = batch->offsets[i];
+ int64_t list_length = batch->offsets[i + 1] - start;
+ if (list_length && (!has_nulls || batch->notNull[i])) {
+ RETURN_NOT_OK(struct_builder->Append(list_length, nullptr));
+ RETURN_NOT_OK(AppendBatch(keytype, keys, start, list_length,
+ struct_builder->field_builder(0)));
+ RETURN_NOT_OK(AppendBatch(valtype, vals, start, list_length,
+ struct_builder->field_builder(1)));
+ }
+ }
+ return Status::OK();
+ }
+
+ template <class builder_type, class batch_type, class elem_type>
+ Status AppendNumericBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<builder_type*>(abuilder);
+ auto batch = static_cast<batch_type*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ const elem_type* source = batch->data.data() + offset;
+ RETURN_NOT_OK(builder->Append(source, length, valid_bytes));
+ return Status::OK();
+ }
+
+ template <class builder_type, class target_type, class batch_type, class source_type>
+ Status AppendNumericBatchCast(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<builder_type*>(abuilder);
+ auto batch = static_cast<batch_type*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+ int start = builder->length();
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ RETURN_NOT_OK(builder->AppendNulls(valid_bytes, length));
+
+ const source_type* source = batch->data.data() + offset;
+ target_type* target = reinterpret_cast<target_type*>(builder->data()->mutable_data());
+
+ std::copy(source, source + length, target + start);
+
+ return Status::OK();
+ }
+
+ Status AppendBoolBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<BooleanBuilder*>(abuilder);
+ auto batch = static_cast<liborc::LongVectorBatch*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+ int start = builder->length();
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ RETURN_NOT_OK(builder->AppendNulls(valid_bytes, length));
+
+ const int64_t* source = batch->data.data() + offset;
+ uint8_t* target = reinterpret_cast<uint8_t*>(builder->data()->mutable_data());
+
+ for (int i = 0; i < length; i++) {
+ if (source[i]) {
+ BitUtil::SetBit(target, start + i);
+ } else {
+ BitUtil::ClearBit(target, start + i);
+ }
+ }
+ return Status::OK();
+ }
+
+ Status AppendTimestampBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<TimestampBuilder*>(abuilder);
+ auto batch = static_cast<liborc::TimestampVectorBatch*>(cbatch);
+
+ if (length == 0) {
+ return Status::OK();
+ }
+ int start = builder->length();
+
+ const uint8_t* valid_bytes = nullptr;
+ if (batch->hasNulls) {
+ valid_bytes = reinterpret_cast<const uint8_t*>(batch->notNull.data()) + offset;
+ }
+ RETURN_NOT_OK(builder->AppendNulls(valid_bytes, length));
+
+ const int64_t* seconds = batch->data.data() + offset;
+ const int64_t* nanos = batch->nanoseconds.data() + offset;
+ int64_t* target = reinterpret_cast<int64_t*>(builder->data()->mutable_data());
+
+ for (int i = 0; i < length; i++) {
+ // TODO: boundscheck this, as ORC supports higher resolution timestamps
+ // than arrow for nanosecond resolution
+ target[start + i] = seconds[i] * kOneSecondNanos + nanos[i];
+ }
+ return Status::OK();
+ }
+
+ template <class builder_type>
+ Status AppendBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<builder_type*>(abuilder);
+ auto batch = static_cast<liborc::StringVectorBatch*>(cbatch);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(batch->data[i], batch->length[i]));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ return Status::OK();
+ }
+
+ Status AppendFixedBinaryBatch(liborc::ColumnVectorBatch* cbatch, int64_t offset,
+ int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<FixedSizeBinaryBuilder*>(abuilder);
+ auto batch = static_cast<liborc::StringVectorBatch*>(cbatch);
+
+ const bool has_nulls = batch->hasNulls;
+ for (int i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(batch->data[i]));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ return Status::OK();
+ }
+
+ Status AppendDecimalBatch(const liborc::Type* type, liborc::ColumnVectorBatch* cbatch,
+ int64_t offset, int64_t length, ArrayBuilder* abuilder) {
+ auto builder = static_cast<Decimal128Builder*>(abuilder);
+
+ const bool has_nulls = cbatch->hasNulls;
+ if (type->getPrecision() == 0 || type->getPrecision() > 18) {
+ auto batch = static_cast<liborc::Decimal128VectorBatch*>(cbatch);
+ for (int i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(
+ Decimal128(batch->values[i].getHighBits(), batch->values[i].getLowBits())));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ } else {
+ auto batch = static_cast<liborc::Decimal64VectorBatch*>(cbatch);
+ for (int i = offset; i < length + offset; i++) {
+ if (!has_nulls || batch->notNull[i]) {
+ RETURN_NOT_OK(builder->Append(Decimal128(batch->values[i])));
+ } else {
+ RETURN_NOT_OK(builder->AppendNull());
+ }
+ }
+ }
+ return Status::OK();
+ }
+
+ private:
+ MemoryPool* pool_;
+ std::unique_ptr<liborc::Reader> reader_;
+ std::vector<StripeInformation> stripes_;
+};
+
+ORCFileReader::ORCFileReader() { impl_.reset(new ORCFileReader::Impl()); }
+
+ORCFileReader::~ORCFileReader() {}
+
+Status ORCFileReader::Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+ MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader) {
+ auto result = std::unique_ptr<ORCFileReader>(new ORCFileReader());
+ RETURN_NOT_OK(result->impl_->Open(file, pool));
+ *reader = std::move(result);
+ return Status::OK();
+}
+
+Status ORCFileReader::ReadSchema(std::shared_ptr<Schema>* out) {
+ return impl_->ReadSchema(out);
+}
+
+Status ORCFileReader::Read(std::shared_ptr<Table>* out) { return impl_->Read(out); }
+
+Status ORCFileReader::Read(const std::vector<int>& include_indices,
+ std::shared_ptr<Table>* out) {
+ return impl_->Read(include_indices, out);
+}
+
+Status ORCFileReader::ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out) {
+ return impl_->ReadStripe(stripe, out);
+}
+
+Status ORCFileReader::ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
+ std::shared_ptr<RecordBatch>* out) {
+ return impl_->ReadStripe(stripe, include_indices, out);
+}
+
+int64_t ORCFileReader::NumberOfStripes() { return impl_->NumberOfStripes(); }
+
+int64_t ORCFileReader::NumberOfRows() { return impl_->NumberOfRows(); }
+
+} // namespace orc
+} // namespace adapters
+} // namespace arrow
diff --git a/cpp/src/arrow/adapters/orc/adapter.h b/cpp/src/arrow/adapters/orc/adapter.h
new file mode 100644
index 0000000..6438658
--- /dev/null
+++ b/cpp/src/arrow/adapters/orc/adapter.h
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_ORC_CONVERTER_H
+#define ARROW_ORC_CONVERTER_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/visibility.h"
+
+namespace arrow {
+
+namespace adapters {
+
+namespace orc {
+
+/// \class ORCFileReader
+/// \brief Read an Arrow Table or RecordBatch from an ORC file.
+class ARROW_EXPORT ORCFileReader {
+ public:
+ ~ORCFileReader();
+
+ /// \brief Create a new ORC reader
+ ///
+ /// \param[in] file the data source
+ /// \param[in] pool a MemoryPool to use for buffer allocations
+ /// \param[out] reader the returned reader object
+ /// \return Status
+ static Status Open(const std::shared_ptr<io::ReadableFileInterface>& file,
+ MemoryPool* pool, std::unique_ptr<ORCFileReader>* reader);
+
+ /// \brief Return the schema read from the ORC file
+ ///
+ /// \param[out] out the returned Schema object
+ Status ReadSchema(std::shared_ptr<Schema>* out);
+
+ /// \brief Read the file as a Table
+ ///
+ /// The table will be composed of one record batch per stripe.
+ ///
+ /// \param[out] out the returned RecordBatch
+ Status Read(std::shared_ptr<Table>* out);
+
+ /// \brief Read the file as a Table
+ ///
+ /// The table will be composed of one record batch per stripe.
+ ///
+ /// \param[in] include_indices the selected field indices to read
+ /// \param[out] out the returned RecordBatch
+ Status Read(const std::vector<int>& include_indices, std::shared_ptr<Table>* out);
+
+ /// \brief Read a single stripe as a RecordBatch
+ ///
+ /// \param[in] stripe the stripe index
+ /// \param[out] out the returned RecordBatch
+ Status ReadStripe(int64_t stripe, std::shared_ptr<RecordBatch>* out);
+
+ /// \brief Read a single stripe as a RecordBatch
+ ///
+ /// \param[in] stripe the stripe index
+ /// \param[in] include_indices the selected field indices to read
+ /// \param[out] out the returned RecordBatch
+ Status ReadStripe(int64_t stripe, const std::vector<int>& include_indices,
+ std::shared_ptr<RecordBatch>* out);
+
+ /// \brief The number of stripes in the file
+ int64_t NumberOfStripes();
+
+ /// \brief The number of rows in the file
+ int64_t NumberOfRows();
+
+ private:
+ class Impl;
+ std::unique_ptr<Impl> impl_;
+ ORCFileReader();
+};
+
+} // namespace orc
+
+} // namespace adapters
+
+} // namespace arrow
+
+#endif // ARROW_ORC_CONVERTER_H
diff --git a/cpp/src/arrow/symbols.map b/cpp/src/arrow/symbols.map
index f216d86..c5d2379 100644
--- a/cpp/src/arrow/symbols.map
+++ b/cpp/src/arrow/symbols.map
@@ -55,6 +55,8 @@
ERR_getErrorString;
# jemalloc
je_arrow_*;
+ # ORC destructors
+ _ZThn8_N3orc*;
extern "C++" {
# devtoolset or -static-libstdc++ - the Red Hat devtoolset statically
@@ -65,6 +67,8 @@
# Statically linked C++ dependencies
boost::*;
+ google::*;
+ orc::*;
snappy::*;
};
};
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index cbbb464..e9de08b 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -70,6 +70,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
option(PYARROW_BUILD_PLASMA
"Build the PyArrow Plasma integration"
OFF)
+ option(PYARROW_BUILD_ORC
+ "Build the PyArrow ORC integration"
+ OFF)
option(PYARROW_BUNDLE_ARROW_CPP
"Bundle the Arrow C++ libraries"
OFF)
@@ -357,6 +360,14 @@ if (PYARROW_BUILD_PLASMA)
file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY})
endif()
+
+if (PYARROW_BUILD_ORC)
+ ## ORC
+ set(CYTHON_EXTENSIONS
+ ${CYTHON_EXTENSIONS}
+ _orc)
+endif()
+
############################################################
# Setup and build Cython modules
############################################################
diff --git a/python/manylinux1/scripts/check_arrow_visibility.sh b/python/manylinux1/scripts/check_arrow_visibility.sh
index 27a30f7..bed357e 100755
--- a/python/manylinux1/scripts/check_arrow_visibility.sh
+++ b/python/manylinux1/scripts/check_arrow_visibility.sh
@@ -17,10 +17,13 @@
# under the License.
nm -D -C /arrow-dist/lib64/libarrow.so > nm_arrow.log
+grep ' T ' nm_arrow.log | grep -v arrow > visible_symbols.log
-if [[ `grep ' T ' nm_arrow.log | grep -v arrow | wc -l` -eq 2 ]]
+if [[ `cat visible_symbols.log | wc -l` -eq 2 ]]
then
exit 0
fi
+cat visible_symbols.log
+
exit 1
diff --git a/python/pyarrow/_orc.pxd b/python/pyarrow/_orc.pxd
new file mode 100644
index 0000000..4116915
--- /dev/null
+++ b/python/pyarrow/_orc.pxd
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from libc.string cimport const_char
+from libcpp.vector cimport vector as std_vector
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
+ CTable, CMemoryPool,
+ CKeyValueMetadata,
+ CRecordBatch,
+ CTable,
+ RandomAccessFile, OutputStream,
+ TimeUnit)
+
+
+cdef extern from "arrow/adapters/orc/adapter.h" namespace "arrow::adapters::orc" nogil:
+ cdef cppclass ORCFileReader:
+
+ @staticmethod
+ CStatus Open(const shared_ptr[RandomAccessFile]& file,
+ CMemoryPool* pool,
+ unique_ptr[ORCFileReader]* reader)
+
+ CStatus ReadSchema(shared_ptr[CSchema]* out)
+
+ CStatus ReadStripe(int64_t stripe, shared_ptr[CRecordBatch]* out)
+ CStatus ReadStripe(int64_t stripe, std_vector[int], shared_ptr[CRecordBatch]* out)
+
+ CStatus Read(shared_ptr[CTable]* out)
+ CStatus Read(std_vector[int], shared_ptr[CTable]* out)
+
+ int64_t NumberOfStripes()
+
+ int64_t NumberOfRows()
diff --git a/python/pyarrow/_orc.pyx b/python/pyarrow/_orc.pyx
new file mode 100644
index 0000000..7ff4bac
--- /dev/null
+++ b/python/pyarrow/_orc.pyx
@@ -0,0 +1,111 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# cython: profile=False
+# distutils: language = c++
+# cython: embedsignature = True
+
+from cython.operator cimport dereference as deref
+from libcpp.vector cimport vector as std_vector
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.lib cimport (check_status,
+ MemoryPool, maybe_unbox_memory_pool,
+ Schema, pyarrow_wrap_schema,
+ RecordBatch,
+ pyarrow_wrap_table,
+ get_reader)
+import six
+
+
+cdef class ORCReader:
+ cdef:
+ object source
+ CMemoryPool* allocator
+ unique_ptr[ORCFileReader] reader
+
+ def __cinit__(self, MemoryPool memory_pool=None):
+ self.allocator = maybe_unbox_memory_pool(memory_pool)
+
+ def open(self, object source):
+ cdef:
+ shared_ptr[RandomAccessFile] rd_handle
+
+ self.source = source
+
+ get_reader(source, &rd_handle)
+ with nogil:
+ check_status(ORCFileReader.Open(rd_handle, self.allocator,
+ &self.reader))
+
+ def schema(self):
+ """
+ The arrow schema for this file.
+
+ Returns
+ -------
+ schema : pyarrow.Schema
+ """
+ cdef:
+ shared_ptr[CSchema] sp_arrow_schema
+
+ with nogil:
+ check_status(deref(self.reader).ReadSchema(&sp_arrow_schema))
+
+ return pyarrow_wrap_schema(sp_arrow_schema)
+
+ def nrows(self):
+ return deref(self.reader).NumberOfRows();
+
+ def nstripes(self):
+ return deref(self.reader).NumberOfStripes();
+
+ def read_stripe(self, n, include_indices=None):
+ cdef:
+ shared_ptr[CRecordBatch] sp_record_batch
+ RecordBatch batch
+ int64_t stripe
+ std_vector[int] indices
+
+ stripe = n
+
+ if include_indices is None:
+ with nogil:
+ check_status(deref(self.reader).ReadStripe(stripe, &sp_record_batch))
+ else:
+ indices = include_indices
+ with nogil:
+ check_status(deref(self.reader).ReadStripe(stripe, indices, &sp_record_batch))
+
+ batch = RecordBatch()
+ batch.init(sp_record_batch)
+ return batch
+
+ def read(self, include_indices=None):
+ cdef:
+ shared_ptr[CTable] sp_table
+ std_vector[int] indices
+
+ if include_indices is None:
+ with nogil:
+ check_status(deref(self.reader).Read(&sp_table))
+ else:
+ indices = include_indices
+ with nogil:
+ check_status(deref(self.reader).Read(indices, &sp_table))
+
+ return pyarrow_wrap_table(sp_table)
diff --git a/python/pyarrow/orc.py b/python/pyarrow/orc.py
new file mode 100644
index 0000000..22451d5
--- /dev/null
+++ b/python/pyarrow/orc.py
@@ -0,0 +1,149 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from itertools import count
+from numbers import Integral
+
+from pyarrow import _orc
+from pyarrow import types
+from pyarrow.lib import Schema
+
+
+def _is_map(typ):
+ return (types.is_list(typ) and
+ types.is_struct(typ.value_type) and
+ typ.value_type.num_children == 2 and
+ typ.value_type[0].name == 'key' and
+ typ.value_type[1].name == 'value')
+
+
+def _traverse(typ, counter):
+ if isinstance(typ, Schema) or types.is_struct(typ):
+ for field in typ:
+ path = (field.name,)
+ yield path, next(counter)
+ for sub, c in _traverse(field.type, counter):
+ yield path + sub, c
+ elif _is_map(typ):
+ for sub_c in _traverse(typ.value_type, counter):
+ yield sub_c
+ elif types.is_list(typ):
+ # Skip one index for list type, since this can never be selected
+ # directly
+ next(counter)
+ for sub_c in _traverse(typ.value_type, counter):
+ yield sub_c
+ elif types.is_union(typ):
+ # Union types not supported, just skip the indexes
+ for dtype in typ:
+ next(counter)
+ for sub_c in _traverse(dtype, counter):
+ pass
+
+
+def _schema_to_indices(schema):
+ return {'.'.join(i): c for i, c in _traverse(schema, count(1))}
+
+
+class ORCFile(object):
+ """
+ Reader interface for a single ORC file
+
+ Parameters
+ ----------
+ source : str or pyarrow.io.NativeFile
+ Readable source. For passing Python file objects or byte buffers,
+ see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
+ """
+ def __init__(self, source):
+ self.reader = _orc.ORCReader()
+ self.reader.open(source)
+ self._column_index_lookup = _schema_to_indices(self.schema)
+
+ @property
+ def schema(self):
+ """The file schema, as an arrow schema"""
+ return self.reader.schema()
+
+ @property
+ def nrows(self):
+ """The number of rows in the file"""
+ return self.reader.nrows()
+
+ @property
+ def nstripes(self):
+ """The number of stripes in the file"""
+ return self.reader.nstripes()
+
+ def _select_indices(self, columns=None):
+ if columns is None:
+ return None
+
+ schema = self.schema
+ indices = []
+ for col in columns:
+ if isinstance(col, Integral):
+ col = int(col)
+ if 0 <= col < len(schema):
+ col = schema[col].name
+ else:
+ raise ValueError("Column indices must be in 0 <= ind < %d,"
+ " got %d" % (len(schema), col))
+ if col in self._column_index_lookup:
+ indices.append(self._column_index_lookup[col])
+ else:
+ raise ValueError("Unknown column name %r" % col)
+
+ return indices
+
+ def read_stripe(self, n, columns=None):
+ """Read a single stripe from the file.
+
+ Parameters
+ ----------
+ n : int
+ The stripe index
+ columns : list
+ If not None, only these columns will be read from the stripe. A
+ column name may be a prefix of a nested field, e.g. 'a' will select
+ 'a.b', 'a.c', and 'a.d.e'
+
+ Returns
+ -------
+ pyarrow.lib.RecordBatch
+ Content of the stripe as a RecordBatch.
+ """
+ include_indices = self._select_indices(columns)
+ return self.reader.read_stripe(n, include_indices=include_indices)
+
+ def read(self, columns=None):
+ """Read the whole file.
+
+ Parameters
+ ----------
+ columns : list
+ If not None, only these columns will be read from the file. A
+ column name may be a prefix of a nested field, e.g. 'a' will select
+ 'a.b', 'a.c', and 'a.d.e'
+
+ Returns
+ -------
+ pyarrow.lib.Table
+ Content of the file as a Table.
+ """
+ include_indices = self._select_indices(columns)
+ return self.reader.read(include_indices=include_indices)
diff --git a/python/setup.py b/python/setup.py
index 32e76ab..3d3831d 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -84,6 +84,7 @@ class build_ext(_build_ext):
('with-parquet', None, 'build the Parquet extension'),
('with-static-parquet', None, 'link parquet statically'),
('with-plasma', None, 'build the Plasma extension'),
+ ('with-orc', None, 'build the ORC extension'),
('bundle-arrow-cpp', None,
'bundle the Arrow C++ libraries')] +
_build_ext.user_options)
@@ -109,12 +110,15 @@ class build_ext(_build_ext):
os.environ.get('PYARROW_WITH_STATIC_BOOST', '1'))
self.with_plasma = strtobool(
os.environ.get('PYARROW_WITH_PLASMA', '0'))
+ self.with_orc = strtobool(
+ os.environ.get('PYARROW_WITH_ORC', '0'))
self.bundle_arrow_cpp = strtobool(
os.environ.get('PYARROW_BUNDLE_ARROW_CPP', '0'))
CYTHON_MODULE_NAMES = [
'lib',
'_parquet',
+ '_orc',
'plasma']
def _run_cmake(self):
@@ -157,6 +161,9 @@ class build_ext(_build_ext):
if self.with_plasma:
cmake_options.append('-DPYARROW_BUILD_PLASMA=on')
+ if self.with_orc:
+ cmake_options.append('-DPYARROW_BUILD_ORC=on')
+
if len(self.cmake_cxxflags) > 0:
cmake_options.append('-DPYARROW_CXXFLAGS="{0}"'
.format(self.cmake_cxxflags))
@@ -284,6 +291,8 @@ class build_ext(_build_ext):
return True
if name == 'plasma' and not self.with_plasma:
return True
+ if name == '_orc' and not self.with_orc:
+ return True
return False
def _get_inplace_dir(self):
--
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].