You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by em...@apache.org on 2019/06/06 09:30:01 UTC
[arrow] branch master updated: ARROW-4714: [C++][JAVA] Providing
JNI interface to Read ORC file via Arrow C++
This is an automated email from the ASF dual-hosted git repository.
emkornfield pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ec6879b ARROW-4714: [C++][JAVA] Providing JNI interface to Read ORC file via Arrow C++
ec6879b is described below
commit ec6879b76413c8b28cb810b28fb6e729bfdcbfef
Author: Yurui Zhou <yu...@alibaba-inc.com>
AuthorDate: Thu Jun 6 02:28:51 2019 -0700
ARROW-4714: [C++][JAVA] Providing JNI interface to Read ORC file via Arrow C++
- setup necessary dev environment for JNI development on JAVA and C++ codebase
- implemented JNI interface to enable reading arrow record batch from ORC files
- implemented a naive arrow buffer reference manager to ensure c++ memory release
Author: Yurui Zhou <yu...@alibaba-inc.com>
Closes #4348 from yuruiz/JniOrcReader and squashes the following commits:
41592bf04 <Yurui Zhou> minor doc fix
44b54203b <Yurui Zhou> make sure lookup operation are performed under lock
706c8dcd5 <Yurui Zhou> resolve comments
de8529cf1 <Yurui Zhou> resolve comments
fc801756d <Yurui Zhou> resolve comments
9b04b762d <Yurui Zhou> fix style issues and add proper docs
9b13d7f62 <Yurui Zhou> replace nullptr with NULLPTR macro
dd981af5f <Yurui Zhou> fix lint and clang-format
44505df5d <Yurui Zhou> Fix cmake format
f2a0c04b8 <Yurui Zhou> destruct schema reader when finish reading
4f89e3454 <Yurui Zhou> Make sure resources are properly released.
26d74db82 <Yurui Zhou> fix minor style check error
ce30933b1 <Yurui Zhou> Add Arrow Jni Reader Unittests
7a80fbd0d <Yurui Zhou> Minor refactor
e4c063041 <Yurui Zhou> remove redundant code
e932aa8b1 <Yurui Zhou> Move jni code to src/jni and change build flag to arrow_jni
1b6a7042a <Yurui Zhou> Interface refactor and performance optimization
3604c24d3 <Yurui Zhou> Resolve merge conflicts
1c0e0b2a5 <Yurui Zhou> Fix minor build errors
e0d9c1f29 <Yurui Zhou> implement JNI interface on both size
a1e80a6dd <Yurui Zhou> Add arrow-orc setup
---
cpp/CMakeLists.txt | 8 +
cpp/build-support/lint_cpp_cli.py | 1 +
cpp/cmake_modules/BuildUtils.cmake | 17 +-
cpp/cmake_modules/DefineOptions.cmake | 2 +
cpp/src/jni/CMakeLists.txt | 24 ++
cpp/src/jni/orc/CMakeLists.txt | 55 ++++
cpp/src/jni/orc/concurrent_map.h | 80 ++++++
cpp/src/jni/orc/jni_wrapper.cpp | 310 +++++++++++++++++++++
java/README.md | 9 +
java/adapter/orc/CMakeLists.txt | 43 +++
java/adapter/orc/pom.xml | 124 +++++++++
.../org/apache/arrow/adapter/orc/OrcFieldNode.java | 45 +++
.../org/apache/arrow/adapter/orc/OrcJniUtils.java | 62 +++++
.../arrow/adapter/orc/OrcMemoryJniWrapper.java | 77 +++++
.../org/apache/arrow/adapter/orc/OrcReader.java | 90 ++++++
.../arrow/adapter/orc/OrcReaderJniWrapper.java | 79 ++++++
.../apache/arrow/adapter/orc/OrcRecordBatch.java | 47 ++++
.../arrow/adapter/orc/OrcReferenceManager.java | 122 ++++++++
.../apache/arrow/adapter/orc/OrcStripeReader.java | 122 ++++++++
.../adapter/orc/OrcStripeReaderJniWrapper.java | 45 +++
.../apache/arrow/adapter/orc/OrcReaderTest.java | 105 +++++++
java/pom.xml | 15 +
22 files changed, 1476 insertions(+), 6 deletions(-)
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index c905645..501c541 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -256,6 +256,10 @@ if(MSVC)
set(ARROW_USE_GLOG OFF)
endif()
+if(ARROW_JNI)
+ set(ARROW_BUILD_STATIC ON)
+endif()
+
if(ARROW_ORC)
set(ARROW_WITH_LZ4 ON)
set(ARROW_WITH_SNAPPY ON)
@@ -729,6 +733,10 @@ if(ARROW_PARQUET)
endif()
endif()
+if(ARROW_JNI)
+ add_subdirectory(src/jni)
+endif()
+
if(ARROW_GANDIVA)
add_subdirectory(src/gandiva)
endif()
diff --git a/cpp/build-support/lint_cpp_cli.py b/cpp/build-support/lint_cpp_cli.py
index ab2de59..e0fee00 100644
--- a/cpp/build-support/lint_cpp_cli.py
+++ b/cpp/build-support/lint_cpp_cli.py
@@ -77,6 +77,7 @@ EXCLUSIONS = _paths('''\
arrow/visitor_inline.h
gandiva/cache.h
gandiva/jni
+ jni/
test
internal''')
diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake
index 45cff6e..5f04254 100644
--- a/cpp/cmake_modules/BuildUtils.cmake
+++ b/cpp/cmake_modules/BuildUtils.cmake
@@ -139,7 +139,8 @@ function(ADD_ARROW_LIB LIB_NAME)
PRIVATE_INCLUDES
DEPENDENCIES
SHARED_INSTALL_INTERFACE_LIBS
- STATIC_INSTALL_INTERFACE_LIBS)
+ STATIC_INSTALL_INTERFACE_LIBS
+ OUTPUT_PATH)
cmake_parse_arguments(ARG
"${options}"
"${one_value_args}"
@@ -164,6 +165,11 @@ function(ADD_ARROW_LIB LIB_NAME)
else()
set(BUILD_STATIC ${ARROW_BUILD_STATIC})
endif()
+ if(ARG_OUTPUT_PATH)
+ set(OUTPUT_PATH ${ARG_OUTPUT_PATH})
+ else()
+ set(OUTPUT_PATH ${BUILD_OUTPUT_ROOT_DIRECTORY})
+ endif()
if(WIN32 OR (CMAKE_GENERATOR STREQUAL Xcode))
# We need to compile C++ separately for each library kind (shared and static)
@@ -234,11 +240,11 @@ function(ADD_ARROW_LIB LIB_NAME)
set_target_properties(${LIB_NAME}_shared
PROPERTIES LIBRARY_OUTPUT_DIRECTORY
- "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ "${OUTPUT_PATH}"
RUNTIME_OUTPUT_DIRECTORY
- "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ "${OUTPUT_PATH}"
PDB_OUTPUT_DIRECTORY
- "${BUILD_OUTPUT_ROOT_DIRECTORY}"
+ "${OUTPUT_PATH}"
LINK_FLAGS
"${ARG_SHARED_LINK_FLAGS}"
OUTPUT_NAME
@@ -313,8 +319,7 @@ function(ADD_ARROW_LIB LIB_NAME)
endif()
set_target_properties(${LIB_NAME}_static
- PROPERTIES LIBRARY_OUTPUT_DIRECTORY
- "${BUILD_OUTPUT_ROOT_DIRECTORY}" OUTPUT_NAME
+ PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${OUTPUT_PATH}" OUTPUT_NAME
${LIB_NAME_STATIC})
if(ARG_STATIC_INSTALL_INTERFACE_LIBS)
diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake
index b00af80..d041da5 100644
--- a/cpp/cmake_modules/DefineOptions.cmake
+++ b/cpp/cmake_modules/DefineOptions.cmake
@@ -151,6 +151,8 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}")
define_option(ARROW_ORC "Build the Arrow ORC adapter" OFF)
+ define_option(ARROW_JNI "Build the Arrow JNI lib" OFF)
+
define_option(ARROW_TENSORFLOW "Build Arrow with TensorFlow support enabled" OFF)
define_option(ARROW_JEMALLOC "Build the Arrow jemalloc-based allocator" ON)
diff --git a/cpp/src/jni/CMakeLists.txt b/cpp/src/jni/CMakeLists.txt
new file mode 100644
index 0000000..3872d67
--- /dev/null
+++ b/cpp/src/jni/CMakeLists.txt
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# arrow_jni
+#
+
+if(ARROW_ORC)
+ add_subdirectory(orc)
+endif()
diff --git a/cpp/src/jni/orc/CMakeLists.txt b/cpp/src/jni/orc/CMakeLists.txt
new file mode 100644
index 0000000..54705fc
--- /dev/null
+++ b/cpp/src/jni/orc/CMakeLists.txt
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# arrow_orc_jni
+#
+
+project(arrow_orc_jni)
+
+cmake_minimum_required(VERSION 3.11)
+
+find_package(JNI REQUIRED)
+
+add_custom_target(arrow_orc_jni)
+
+set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated")
+
+add_subdirectory(../../../../java/adapter/orc ./java)
+
+set(ARROW_BUILD_STATIC OFF)
+
+add_arrow_lib(arrow_orc_jni
+ BUILD_SHARED
+ SOURCES
+ jni_wrapper.cpp
+ OUTPUTS
+ ARROW_ORC_JNI_LIBRARIES
+ SHARED_PRIVATE_LINK_LIBS
+ arrow_static
+ EXTRA_INCLUDES
+ ${JNI_HEADERS_DIR}
+ PRIVATE_INCLUDES
+ ${JNI_INCLUDE_DIRS}
+ ${CMAKE_CURRENT_BINARY_DIR}
+ DEPENDENCIES
+ arrow_static
+ arrow_orc_java
+ OUTPUT_PATH
+ ${CMAKE_CURRENT_BINARY_DIR})
+
+add_dependencies(arrow_orc_jni ${ARROW_ORC_JNI_LIBRARIES})
diff --git a/cpp/src/jni/orc/concurrent_map.h b/cpp/src/jni/orc/concurrent_map.h
new file mode 100644
index 0000000..9ca2fc8
--- /dev/null
+++ b/cpp/src/jni/orc/concurrent_map.h
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+#ifndef JNI_ID_TO_MODULE_MAP_H
+#define JNI_ID_TO_MODULE_MAP_H
+
+#include <memory>
+#include <mutex>
+#include <unordered_map>
+#include <utility>
+
+#include "arrow/util/macros.h"
+
+namespace arrow {
+namespace jni {
+
+/**
+ * An utility class that map module id to module pointers.
+ * @tparam Holder class of the object to hold.
+ */
+template <typename Holder>
+class ConcurrentMap {
+ public:
+ ConcurrentMap() : module_id_(init_module_id_) {}
+
+ jlong Insert(Holder holder) {
+ std::lock_guard<std::mutex> lock(mtx_);
+ jlong result = module_id_++;
+ map_.insert(std::pair<jlong, Holder>(result, holder));
+ return result;
+ }
+
+ void Erase(jlong module_id) {
+ std::lock_guard<std::mutex> lock(mtx_);
+ map_.erase(module_id);
+ }
+
+ Holder Lookup(jlong module_id) {
+ std::lock_guard<std::mutex> lock(mtx_);
+ auto it = map_.find(module_id);
+ if (it != map_.end()) {
+ return it->second;
+ }
+ return NULLPTR;
+ }
+
+ void Clear() {
+ std::lock_guard<std::mutex> lock(mtx_);
+ map_.clear();
+ }
+
+ private:
+ // Initialize the module id starting value to a number greater than zero
+ // to allow for easier debugging of uninitialized java variables.
+ static constexpr int init_module_id_ = 4;
+
+ int64_t module_id_;
+ std::mutex mtx_;
+ // map from module ids returned to Java and module pointers
+ std::unordered_map<jlong, Holder> map_;
+};
+
+} // namespace jni
+} // namespace arrow
+
+#endif // JNI_ID_TO_MODULE_MAP_H
diff --git a/cpp/src/jni/orc/jni_wrapper.cpp b/cpp/src/jni/orc/jni_wrapper.cpp
new file mode 100644
index 0000000..f18bacc
--- /dev/null
+++ b/cpp/src/jni/orc/jni_wrapper.cpp
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/adapters/orc/adapter.h>
+#include <arrow/array.h>
+#include <arrow/buffer.h>
+#include <arrow/io/api.h>
+#include <arrow/ipc/api.h>
+#include <arrow/util/logging.h>
+#include <cassert>
+#include <iostream>
+#include <string>
+
+#include "org_apache_arrow_adapter_orc_OrcMemoryJniWrapper.h"
+#include "org_apache_arrow_adapter_orc_OrcReaderJniWrapper.h"
+#include "org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper.h"
+
+#include "./concurrent_map.h"
+
+using ORCFileReader = arrow::adapters::orc::ORCFileReader;
+using RecordBatchReader = arrow::RecordBatchReader;
+
+static jclass io_exception_class;
+static jclass illegal_access_exception_class;
+static jclass illegal_argument_exception_class;
+
+static jclass orc_field_node_class;
+static jmethodID orc_field_node_constructor;
+
+static jclass orc_memory_class;
+static jmethodID orc_memory_constructor;
+
+static jclass record_batch_class;
+static jmethodID record_batch_constructor;
+
+static jint JNI_VERSION = JNI_VERSION_1_6;
+
+using arrow::jni::ConcurrentMap;
+
+static ConcurrentMap<std::shared_ptr<arrow::Buffer>> buffer_holder_;
+static ConcurrentMap<std::shared_ptr<RecordBatchReader>> orc_stripe_reader_holder_;
+static ConcurrentMap<std::shared_ptr<ORCFileReader>> orc_reader_holder_;
+
+jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
+ jclass local_class = env->FindClass(class_name);
+ jclass global_class = (jclass)env->NewGlobalRef(local_class);
+ env->DeleteLocalRef(local_class);
+ return global_class;
+}
+
+jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) {
+ jmethodID ret = env->GetMethodID(this_class, name, sig);
+ if (ret == nullptr) {
+ std::string error_message = "Unable to find method " + std::string(name) +
+ " within signature" + std::string(sig);
+ env->ThrowNew(illegal_access_exception_class, error_message.c_str());
+ }
+
+ return ret;
+}
+
+std::string JStringToCString(JNIEnv* env, jstring string) {
+ int32_t jlen, clen;
+ clen = env->GetStringUTFLength(string);
+ jlen = env->GetStringLength(string);
+ std::vector<char> buffer(clen);
+ env->GetStringUTFRegion(string, 0, jlen, buffer.data());
+ return std::string(buffer.data(), clen);
+}
+
+std::shared_ptr<ORCFileReader> GetFileReader(JNIEnv* env, jlong id) {
+ auto reader = orc_reader_holder_.Lookup(id);
+ if (!reader) {
+ std::string error_message = "invalid reader id " + std::to_string(id);
+ env->ThrowNew(illegal_argument_exception_class, error_message.c_str());
+ }
+
+ return reader;
+}
+
+std::shared_ptr<RecordBatchReader> GetStripeReader(JNIEnv* env, jlong id) {
+ auto reader = orc_stripe_reader_holder_.Lookup(id);
+ if (!reader) {
+ std::string error_message = "invalid stripe reader id " + std::to_string(id);
+ env->ThrowNew(illegal_argument_exception_class, error_message.c_str());
+ }
+
+ return reader;
+}
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+jint JNI_OnLoad(JavaVM* vm, void* reserved) {
+ JNIEnv* env;
+ if (vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION) != JNI_OK) {
+ return JNI_ERR;
+ }
+
+ io_exception_class = CreateGlobalClassReference(env, "Ljava/io/IOException;");
+ illegal_access_exception_class =
+ CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;");
+ illegal_argument_exception_class =
+ CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;");
+
+ orc_field_node_class =
+ CreateGlobalClassReference(env, "Lorg/apache/arrow/adapter/orc/OrcFieldNode;");
+ orc_field_node_constructor = GetMethodID(env, orc_field_node_class, "<init>", "(II)V");
+
+ orc_memory_class = CreateGlobalClassReference(
+ env, "Lorg/apache/arrow/adapter/orc/OrcMemoryJniWrapper;");
+ orc_memory_constructor = GetMethodID(env, orc_memory_class, "<init>", "(JJJJ)V");
+
+ record_batch_class =
+ CreateGlobalClassReference(env, "Lorg/apache/arrow/adapter/orc/OrcRecordBatch;");
+ record_batch_constructor =
+ GetMethodID(env, record_batch_class, "<init>",
+ "(I[Lorg/apache/arrow/adapter/orc/OrcFieldNode;"
+ "[Lorg/apache/arrow/adapter/orc/OrcMemoryJniWrapper;)V");
+
+ env->ExceptionDescribe();
+
+ return JNI_VERSION;
+}
+
+void JNI_OnUnload(JavaVM* vm, void* reserved) {
+ JNIEnv* env;
+ vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION);
+ env->DeleteGlobalRef(io_exception_class);
+ env->DeleteGlobalRef(illegal_access_exception_class);
+ env->DeleteGlobalRef(illegal_argument_exception_class);
+ env->DeleteGlobalRef(orc_field_node_class);
+ env->DeleteGlobalRef(orc_memory_class);
+ env->DeleteGlobalRef(record_batch_class);
+
+ buffer_holder_.Clear();
+ orc_stripe_reader_holder_.Clear();
+ orc_reader_holder_.Clear();
+}
+
+JNIEXPORT jlong JNICALL Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_open(
+ JNIEnv* env, jobject this_obj, jstring file_path) {
+ std::shared_ptr<arrow::io::ReadableFile> in_file;
+
+ std::string path = JStringToCString(env, file_path);
+
+ arrow::Status ret;
+ if (path.find("hdfs://") == 0) {
+ env->ThrowNew(io_exception_class, "hdfs path not supported yet.");
+ } else {
+ ret = arrow::io::ReadableFile::Open(path, &in_file);
+ }
+
+ if (ret.ok()) {
+ std::unique_ptr<ORCFileReader> reader;
+
+ ret = ORCFileReader::Open(
+ std::static_pointer_cast<arrow::io::RandomAccessFile>(in_file),
+ arrow::default_memory_pool(), &reader);
+
+ if (!ret.ok()) {
+ env->ThrowNew(io_exception_class, std::string("Failed open file" + path).c_str());
+ }
+
+ return orc_reader_holder_.Insert(std::shared_ptr<ORCFileReader>(reader.release()));
+ }
+
+ return static_cast<jlong>(ret.code()) * -1;
+}
+
+JNIEXPORT void JNICALL Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_close(
+ JNIEnv* env, jobject this_obj, jlong id) {
+ orc_reader_holder_.Erase(id);
+}
+
+JNIEXPORT jboolean JNICALL Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_seek(
+ JNIEnv* env, jobject this_obj, jlong id, jint row_number) {
+ auto reader = GetFileReader(env, id);
+ return reader->Seek(row_number).ok();
+}
+
+JNIEXPORT jint JNICALL
+Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_getNumberOfStripes(JNIEnv* env,
+ jobject this_obj,
+ jlong id) {
+ auto reader = GetFileReader(env, id);
+ return reader->NumberOfStripes();
+}
+
+JNIEXPORT jlong JNICALL
+Java_org_apache_arrow_adapter_orc_OrcReaderJniWrapper_nextStripeReader(JNIEnv* env,
+ jobject this_obj,
+ jlong id,
+ jlong batch_size) {
+ auto reader = GetFileReader(env, id);
+
+ std::shared_ptr<RecordBatchReader> stripe_reader;
+ auto status = reader->NextStripeReader(batch_size, &stripe_reader);
+
+ if (!status.ok()) {
+ return static_cast<jlong>(status.code()) * -1;
+ }
+
+ if (!stripe_reader) {
+ return static_cast<jlong>(arrow::StatusCode::Invalid) * -1;
+ }
+
+ return orc_stripe_reader_holder_.Insert(stripe_reader);
+}
+
+JNIEXPORT jbyteArray JNICALL
+Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_getSchema(JNIEnv* env,
+ jclass this_cls,
+ jlong id) {
+ auto stripe_reader = GetStripeReader(env, id);
+
+ auto schema = stripe_reader->schema();
+
+ std::shared_ptr<arrow::Buffer> out;
+ auto status =
+ arrow::ipc::SerializeSchema(*schema, nullptr, arrow::default_memory_pool(), &out);
+ if (!status.ok()) {
+ return nullptr;
+ }
+
+ jbyteArray ret = env->NewByteArray(out->size());
+ auto src = reinterpret_cast<const jbyte*>(out->data());
+ env->SetByteArrayRegion(ret, 0, out->size(), src);
+ return ret;
+}
+
+JNIEXPORT jobject JNICALL
+Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_next(JNIEnv* env,
+ jclass this_cls,
+ jlong id) {
+ auto stripe_reader = GetStripeReader(env, id);
+
+ std::shared_ptr<arrow::RecordBatch> record_batch;
+ auto status = stripe_reader->ReadNext(&record_batch);
+ if (!status.ok() || !record_batch) {
+ return nullptr;
+ }
+
+ auto schema = stripe_reader->schema();
+
+ // TODO: ARROW-4714 Ensure JVM has sufficient capacity to create local references
+ // create OrcFieldNode[]
+ jobjectArray field_array =
+ env->NewObjectArray(schema->num_fields(), orc_field_node_class, nullptr);
+
+ std::vector<std::shared_ptr<arrow::Buffer>> buffers;
+ for (int i = 0; i < schema->num_fields(); ++i) {
+ auto column = record_batch->column(i);
+ auto dataArray = column->data();
+ jobject field = env->NewObject(orc_field_node_class, orc_field_node_constructor,
+ column->length(), column->null_count());
+ env->SetObjectArrayElement(field_array, i, field);
+
+ for (auto& buffer : dataArray->buffers) {
+ buffers.push_back(buffer);
+ }
+ }
+
+ // create OrcMemoryJniWrapper[]
+ jobjectArray memory_array =
+ env->NewObjectArray(buffers.size(), orc_memory_class, nullptr);
+
+ for (size_t j = 0; j < buffers.size(); ++j) {
+ auto buffer = buffers[j];
+ jobject memory = env->NewObject(orc_memory_class, orc_memory_constructor,
+ buffer_holder_.Insert(buffer), buffer->data(),
+ buffer->size(), buffer->capacity());
+ env->SetObjectArrayElement(memory_array, j, memory);
+ }
+
+ // create OrcRecordBatch
+ jobject ret = env->NewObject(record_batch_class, record_batch_constructor,
+ record_batch->num_rows(), field_array, memory_array);
+
+ return ret;
+}
+
+JNIEXPORT void JNICALL Java_org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper_close(
+ JNIEnv* env, jclass this_cls, jlong id) {
+ orc_stripe_reader_holder_.Erase(id);
+}
+
+JNIEXPORT void JNICALL Java_org_apache_arrow_adapter_orc_OrcMemoryJniWrapper_release(
+ JNIEnv* env, jobject this_obj, jlong id) {
+ buffer_holder_.Erase(id);
+}
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/java/README.md b/java/README.md
index c69ff88..cecdd40 100644
--- a/java/README.md
+++ b/java/README.md
@@ -45,6 +45,15 @@ mvn install -P gandiva -pl gandiva -am -Dgandiva.cpp.build.dir=../../debug
This library is still in Alpha stages, and subject to API changes without
deprecation warnings.
+## Building and running tests for arrow jni (optional)
+Arrow Cpp must be built before this step. The cpp build directory must
+be provided as the value for argument arrow.cpp.build.dir. eg.
+
+```
+cd java
+mvn install -P native-orc -pl arrow-jni -am -Darrow.cpp.build.dir=../../release
+```
+
## Java Code Style Guide
Arrow Java follows the Google style guide [here][3] with the following
diff --git a/java/adapter/orc/CMakeLists.txt b/java/adapter/orc/CMakeLists.txt
new file mode 100644
index 0000000..c6facac
--- /dev/null
+++ b/java/adapter/orc/CMakeLists.txt
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#
+# arrow_orc_java
+#
+
+# Headers: top level
+
+project(arrow_orc_java)
+
+# Find java/jni
+include(FindJava)
+include(UseJava)
+include(FindJNI)
+
+message("generating headers to ${JNI_HEADERS_DIR}")
+
+add_jar(
+ arrow_orc_java
+ src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java
+ src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java
+ src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java
+ src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java
+ src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java
+ src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java
+ GENERATE_NATIVE_HEADERS arrow_orc_java-native
+ DESTINATION ${JNI_HEADERS_DIR}
+)
diff --git a/java/adapter/orc/pom.xml b/java/adapter/orc/pom.xml
new file mode 100644
index 0000000..f718dd2
--- /dev/null
+++ b/java/adapter/orc/pom.xml
@@ -0,0 +1,124 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-format</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ <version>1.5.5</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.2.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-storage-api</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <parent>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-java-root</artifactId>
+ <version>0.14.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <groupId>org.apache.arrow.orc</groupId>
+ <artifactId>arrow-orc</artifactId>
+ <name>Arrow Orc Adapter</name>
+ <packaging>jar</packaging>
+ <properties>
+ <arrow.cpp.build.dir>../../../cpp/release-build/</arrow.cpp.build.dir>
+ </properties>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>${arrow.cpp.build.dir}/src/jni/orc</directory>
+ <includes>
+ <include>**/libarrow_orc_jni.*</include>
+ </includes>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <systemPropertyVariables>
+ <user.timezone>UTC</user.timezone>
+ </systemPropertyVariables>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java
new file mode 100644
index 0000000..716a138
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcFieldNode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+/**
+ * Metadata about Vectors/Arrays that is passed via JNI interface.
+ */
+class OrcFieldNode {
+
+ private final int length;
+ private final int nullCount;
+
+ /**
+ * Construct a new instance.
+ * @param length the number of values written.
+ * @param nullCount the number of null values.
+ */
+ public OrcFieldNode(int length, int nullCount) {
+ this.length = length;
+ this.nullCount = nullCount;
+ }
+
+ int getLength() {
+ return length;
+ }
+
+ int getNullCount() {
+ return nullCount;
+ }
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java
new file mode 100644
index 0000000..600569b
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcJniUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * Helper class for JNI related operations.
+ */
+class OrcJniUtils {
+ private static final String LIBRARY_NAME = "arrow_orc_jni";
+ private static boolean isLoaded = false;
+
+ private OrcJniUtils() {}
+
+ static void loadOrcAdapterLibraryFromJar()
+ throws IOException, IllegalAccessException {
+ synchronized (OrcJniUtils.class) {
+ if (!isLoaded) {
+ final String libraryToLoad = System.mapLibraryName(LIBRARY_NAME);
+ final File libraryFile = moveFileFromJarToTemp(
+ System.getProperty("java.io.tmpdir"), libraryToLoad);
+ System.load(libraryFile.getAbsolutePath());
+ isLoaded = true;
+ }
+ }
+ }
+
+ private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad)
+ throws IOException {
+ final File temp = File.createTempFile(tmpDir, libraryToLoad);
+ try (final InputStream is = OrcReaderJniWrapper.class.getClassLoader()
+ .getResourceAsStream(libraryToLoad)) {
+ if (is == null) {
+ throw new FileNotFoundException(libraryToLoad);
+ } else {
+ Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+ return temp;
+ }
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java
new file mode 100644
index 0000000..27f54b7
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcMemoryJniWrapper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+/**
+ * Wrapper for orc memory allocated by native code.
+ */
+class OrcMemoryJniWrapper implements AutoCloseable {
+
+ private final long nativeInstanceId;
+
+ private final long memoryAddress;
+
+ private final long size;
+
+ private final long capacity;
+
+ /**
+ * Construct a new instance.
+ * @param nativeInstanceId unique id of the underlying memory.
+ * @param memoryAddress starting memory address of the the underlying memory.
+ * @param size size of the valid data.
+ * @param capacity allocated memory size.
+ */
+ OrcMemoryJniWrapper(long nativeInstanceId, long memoryAddress, long size, long capacity) {
+ this.nativeInstanceId = nativeInstanceId;
+ this.memoryAddress = memoryAddress;
+ this.size = size;
+ this.capacity = capacity;
+ }
+
+ /**
+ * Return the size of underlying chunk of memory that has valid data.
+ * @return valid data size
+ */
+ long getSize() {
+ return size;
+ }
+
+ /**
+ * Return the size of underlying chunk of memory managed by this OrcMemoryJniWrapper.
+ * @return underlying memory size
+ */
+ long getCapacity() {
+ return capacity;
+ }
+
+ /**
+ * Return the memory address of underlying chunk of memory.
+ * @return memory address
+ */
+ long getMemoryAddress() {
+ return memoryAddress;
+ }
+
+ @Override
+ public void close() {
+ release(nativeInstanceId);
+ }
+
+ private native void release(long id);
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java
new file mode 100644
index 0000000..366489f
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReader.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import java.io.IOException;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Orc Reader that allow accessing orc stripes in Orc file.
+ * This orc reader basically acts like an ArrowReader iterator that
+ * iterate over orc stripes. Each stripe will be accessed via an
+ * ArrowReader.
+ */
+public class OrcReader implements AutoCloseable {
+ private final OrcReaderJniWrapper jniWrapper;
+ private BufferAllocator allocator;
+
+ /**
+ * reference to native reader instance.
+ */
+ private final long nativeInstanceId;
+
+ /**
+ * Create an OrcReader that iterate over orc stripes.
+ * @param filePath file path to target file, currently only support local file.
+ * @param allocator allocator provided to ArrowReader.
+ * @throws IOException throws exception in case of file not found
+ */
+ public OrcReader(String filePath, BufferAllocator allocator) throws IOException, IllegalAccessException {
+ this.allocator = allocator;
+ this.jniWrapper = OrcReaderJniWrapper.getInstance();
+ this.nativeInstanceId = jniWrapper.open(filePath);
+ }
+
+ /**
+ * Seek to designated row. Invoke NextStripeReader() after seek
+ * will return stripe reader starting from designated row.
+ * @param rowNumber the rows number to seek
+ * @return true if seek operation is succeeded
+ */
+ public boolean seek(int rowNumber) throws IllegalArgumentException {
+ return jniWrapper.seek(nativeInstanceId, rowNumber);
+ }
+
+ /**
+ * Get a stripe level ArrowReader with specified batchSize in each record batch.
+ *
+ * @param batchSize the number of rows loaded on each iteration
+ * @return ArrowReader that iterate over current stripes
+ */
+ public ArrowReader nextStripeReader(long batchSize) throws IllegalArgumentException {
+ long stripeReaderId = jniWrapper.nextStripeReader(nativeInstanceId, batchSize);
+ if (stripeReaderId < 0) {
+ return null;
+ }
+
+ return new OrcStripeReader(stripeReaderId, allocator);
+ }
+
+ /**
+ * The number of stripes in the file.
+ *
+ * @return number of stripes
+ */
+ public int getNumberOfStripes() throws IllegalArgumentException {
+ return jniWrapper.getNumberOfStripes(nativeInstanceId);
+ }
+
+ @Override
+ public void close() throws Exception {
+ jniWrapper.close(nativeInstanceId);
+ }
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java
new file mode 100644
index 0000000..ff449c3
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReaderJniWrapper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import java.io.IOException;
+
+/**
+ * JNI wrapper for Orc reader.
+ */
+class OrcReaderJniWrapper {
+
+ private static volatile OrcReaderJniWrapper INSTANCE;
+
+ static OrcReaderJniWrapper getInstance() throws IOException, IllegalAccessException {
+ if (INSTANCE == null) {
+ synchronized (OrcReaderJniWrapper.class) {
+ if (INSTANCE == null) {
+ OrcJniUtils.loadOrcAdapterLibraryFromJar();
+ INSTANCE = new OrcReaderJniWrapper();
+ }
+ }
+ }
+
+ return INSTANCE;
+ }
+
+ /**
+ * Construct a orc file reader over the target file.
+ * @param fileName absolute file path of target file
+ * @return id of the orc reader instance if file opened successfully,
+ * otherwise return error code * -1.
+ */
+ native long open(String fileName);
+
+ /**
+ * Release resources associated with designated reader instance.
+ * @param readerId id of the reader instance.
+ */
+ native void close(long readerId);
+
+ /**
+ * Seek to designated row. Invoke nextStripeReader() after seek
+ * will return id of stripe reader starting from designated row.
+ * @param readerId id of the reader instance
+ * @param rowNumber the rows number to seek
+ * @return true if seek operation is succeeded
+ */
+ native boolean seek(long readerId, int rowNumber);
+
+ /**
+ * The number of stripes in the file.
+ * @param readerId id of the reader instance
+ * @return number of stripes
+ */
+ native int getNumberOfStripes(long readerId);
+
+ /**
+ * Get a stripe level ArrowReader with specified batchSize in each record batch.
+ * @param readerId id of the reader instance
+ * @param batchSize the number of rows loaded on each iteration
+ * @return id of the stripe reader instance.
+ */
+ native long nextStripeReader(long readerId, long batchSize);
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java
new file mode 100644
index 0000000..a006cac
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcRecordBatch.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Wrapper for record batch meta and native memory.
+ */
+class OrcRecordBatch {
+ final int length;
+
+ /**
+ * Nodes correspond to the pre-ordered flattened logical schema.
+ */
+ final List<OrcFieldNode> nodes;
+
+ final List<OrcMemoryJniWrapper> buffers;
+
+ /**
+ * Construct a new instance.
+ * @param length number of records included in current batch
+ * @param nodes meta data for each fields
+ * @param buffers buffers for underlying data
+ */
+ OrcRecordBatch(int length, OrcFieldNode[] nodes, OrcMemoryJniWrapper[] buffers) {
+ this.length = length;
+ this.nodes = Arrays.asList(nodes);
+ this.buffers = Arrays.asList(buffers);
+ }
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java
new file mode 100644
index 0000000..457d25e
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcReferenceManager.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.OwnershipTransferResult;
+import org.apache.arrow.memory.ReferenceManager;
+import org.apache.arrow.util.Preconditions;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * A simple reference manager implementation for memory allocated by native code.
+ * The underlying memory will be released when reference count reach zero.
+ */
+public class OrcReferenceManager implements ReferenceManager {
+ private final AtomicInteger bufRefCnt = new AtomicInteger(0);
+
+ private OrcMemoryJniWrapper memory;
+
+ OrcReferenceManager(OrcMemoryJniWrapper memory) {
+ this.memory = memory;
+ }
+
+ @Override
+ public int getRefCount() {
+ return bufRefCnt.get();
+ }
+
+ @Override
+ public boolean release() {
+ return release(1);
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ Preconditions.checkState(decrement >= 1,
+ "ref count decrement should be greater than or equal to 1");
+ // decrement the ref count
+ final int refCnt;
+ synchronized (this) {
+ refCnt = bufRefCnt.addAndGet(-decrement);
+ if (refCnt == 0) {
+ // refcount of this reference manager has dropped to 0
+ // release the underlying memory
+ memory.close();
+ }
+ }
+ // the new ref count should be >= 0
+ Preconditions.checkState(refCnt >= 0, "RefCnt has gone negative");
+ return refCnt == 0;
+ }
+
+ @Override
+ public void retain() {
+ retain(1);
+ }
+
+ @Override
+ public void retain(int increment) {
+ Preconditions.checkArgument(increment > 0, "retain(%d) argument is not positive", increment);
+ bufRefCnt.addAndGet(increment);
+ }
+
+ @Override
+ public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) {
+ retain();
+ return srcBuffer;
+ }
+
+ @Override
+ public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, int index, int length) {
+ final long derivedBufferAddress = sourceBuffer.memoryAddress() + index;
+
+ // create new ArrowBuf
+ final ArrowBuf derivedBuf = new ArrowBuf(
+ this,
+ null,
+ length, // length (in bytes) in the underlying memory chunk for this new ArrowBuf
+ derivedBufferAddress, // starting byte address in the underlying memory for this new ArrowBuf,
+ false);
+
+ return derivedBuf;
+ }
+
+ @Override
+ public OwnershipTransferResult transferOwnership(ArrowBuf sourceBuffer, BufferAllocator targetAllocator) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BufferAllocator getAllocator() {
+ return null;
+ }
+
+ @Override
+ public int getSize() {
+ return (int)memory.getSize();
+ }
+
+ @Override
+ public int getAccountedSize() {
+ return 0;
+ }
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
new file mode 100644
index 0000000..c69e74a
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.flatbuf.MessageHeader;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
+import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.message.MessageChannelReader;
+import org.apache.arrow.vector.ipc.message.MessageResult;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
+
+import io.netty.buffer.ArrowBuf;
+
+/**
+ * Orc stripe that load data into ArrowRecordBatch.
+ */
+public class OrcStripeReader extends ArrowReader {
+ /**
+ * reference to native stripe reader instance.
+ */
+ private final long nativeInstanceId;
+
+ /**
+ * Construct a new instance.
+ * @param nativeInstanceId nativeInstanceId of the stripe reader instance, obtained by
+ * calling nextStripeReader from OrcReaderJniWrapper
+ * @param allocator memory allocator for accounting.
+ */
+ OrcStripeReader(long nativeInstanceId, BufferAllocator allocator) {
+ super(allocator);
+ this.nativeInstanceId = nativeInstanceId;
+ }
+
+ @Override
+ public boolean loadNextBatch() throws IOException {
+ OrcRecordBatch recordBatch = OrcStripeReaderJniWrapper.next(nativeInstanceId);
+ if (recordBatch == null) {
+ return false;
+ }
+
+ ArrayList<ArrowBuf> buffers = new ArrayList<>();
+ for (OrcMemoryJniWrapper buffer : recordBatch.buffers) {
+ buffers.add(new ArrowBuf(
+ new OrcReferenceManager(buffer),
+ null,
+ (int)buffer.getSize(),
+ buffer.getMemoryAddress(),
+ false));
+ }
+
+ loadRecordBatch(new ArrowRecordBatch(
+ recordBatch.length,
+ recordBatch.nodes.stream()
+ .map(buf -> new ArrowFieldNode(buf.getLength(), buf.getNullCount()))
+ .collect(Collectors.toList()),
+ buffers));
+ return true;
+ }
+
+ @Override
+ public long bytesRead() {
+ return 0;
+ }
+
+
+ @Override
+ protected void closeReadSource() throws IOException {
+ OrcStripeReaderJniWrapper.close(nativeInstanceId);
+ }
+
+ @Override
+ protected Schema readSchema() throws IOException {
+ byte[] schemaBytes = OrcStripeReaderJniWrapper.getSchema(nativeInstanceId);
+
+ try (MessageChannelReader schemaReader =
+ new MessageChannelReader(
+ new ReadChannel(
+ new ByteArrayReadableSeekableByteChannel(schemaBytes)), allocator)) {
+
+ MessageResult result = schemaReader.readNext();
+ if (result == null) {
+ throw new IOException("Unexpected end of input. Missing schema.");
+ }
+
+ if (result.getMessage().headerType() != MessageHeader.Schema) {
+ throw new IOException("Expected schema but header was " + result.getMessage().headerType());
+ }
+
+ return MessageSerializer.deserializeSchema(result.getMessage());
+ }
+ }
+
+ @Override
+ protected ArrowDictionaryBatch readDictionary() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java
new file mode 100644
index 0000000..1dd9698
--- /dev/null
+++ b/java/adapter/orc/src/main/java/org/apache/arrow/adapter/orc/OrcStripeReaderJniWrapper.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+/**
+ * JNI wrapper for orc stripe reader.
+ */
+class OrcStripeReaderJniWrapper {
+
+ /**
+ * Get the schema of current stripe.
+ * @param readerId id of the stripe reader instance.
+ * @return serialized schema.
+ */
+ static native byte[] getSchema(long readerId);
+
+ /**
+ * Load next record batch.
+ * @param readerId id of the stripe reader instance.
+ * @return loaded record batch, return null when reached
+ * the end of current stripe.
+ */
+ static native OrcRecordBatch next(long readerId);
+
+ /**
+ * Release resources of underlying reader.
+ * @param readerId id of the stripe reader instance.
+ */
+ static native void close(long readerId);
+}
diff --git a/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java b/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java
new file mode 100644
index 0000000..943b2cb
--- /dev/null
+++ b/java/adapter/orc/src/test/java/org/apache/arrow/adapter/orc/OrcReaderTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.adapter.orc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+
+
+public class OrcReaderTest {
+
+ @Rule
+ public TemporaryFolder testFolder = new TemporaryFolder();
+
+ private static final int MAX_ALLOCATION = 8 * 1024;
+ private static RootAllocator allocator;
+
+ @BeforeClass
+ public static void beforeClass() {
+ allocator = new RootAllocator(MAX_ALLOCATION);
+ }
+
+ @Test
+ public void testOrcJniReader() throws Exception {
+ TypeDescription schema = TypeDescription.fromString("struct<x:int,y:string>");
+ File testFile = new File(testFolder.getRoot(), "test-orc");
+
+ Writer writer = OrcFile.createWriter(new Path(testFile.getAbsolutePath()),
+ OrcFile.writerOptions(new Configuration()).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector longColumnVector = (LongColumnVector) batch.cols[0];
+ BytesColumnVector bytesColumnVector = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < 1024; ++r) {
+ int row = batch.size++;
+ longColumnVector.vector[row] = r;
+ byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
+ bytesColumnVector.setRef(row, buffer, 0, buffer.length);
+ }
+ writer.addRowBatch(batch);
+ writer.close();
+
+ OrcReader reader = new OrcReader(testFile.getAbsolutePath(), allocator);
+ assertEquals(1, reader.getNumberOfStripes());
+
+ ArrowReader stripeReader = reader.nextStripeReader(1024);
+ VectorSchemaRoot schemaRoot = stripeReader.getVectorSchemaRoot();
+ stripeReader.loadNextBatch();
+
+ List<FieldVector> fields = schemaRoot.getFieldVectors();
+ assertEquals(2, fields.size());
+
+ IntVector intVector = (IntVector)fields.get(0);
+ VarCharVector varCharVector = (VarCharVector)fields.get(1);
+ for (int i = 0; i < 1024; ++i) {
+ assertEquals(i, intVector.get(i));
+ assertEquals("Last-" + (i * 3), new String(varCharVector.get(i), StandardCharsets.UTF_8));
+ }
+
+ assertFalse(stripeReader.loadNextBatch());
+ assertNull(reader.nextStripeReader(1024));
+
+ stripeReader.close();
+ reader.close();
+ }
+}
diff --git a/java/pom.xml b/java/pom.xml
index 6661625..0cdb2a3 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -682,6 +682,21 @@
<module>gandiva</module>
</modules>
</profile>
+
+ <profile>
+ <!-- orc java depends on arrow cpp, and arrow cpp isn't enabled by default yet -->
+ <id>arrow-jni</id>
+ <modules>
+ <module>format</module>
+ <module>memory</module>
+ <module>vector</module>
+ <module>tools</module>
+ <module>adapter/jdbc</module>
+ <module>adapter/orc</module>
+ <module>plasma</module>
+ <module>flight</module>
+ </modules>
+ </profile>
</profiles>
</project>