You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/04/26 16:11:18 UTC
[arrow] branch master updated: ARROW-7272: [C++][Java][Dataset] JNI bridge between RecordBatch and VectorSchemaRoot
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new dc97883dee ARROW-7272: [C++][Java][Dataset] JNI bridge between RecordBatch and VectorSchemaRoot
dc97883dee is described below
commit dc97883dee25ba8da55c7591060c44de2ea00865
Author: Hongze Zhang <ho...@intel.com>
AuthorDate: Tue Apr 26 18:11:11 2022 +0200
ARROW-7272: [C++][Java][Dataset] JNI bridge between RecordBatch and VectorSchemaRoot
Added simple utility API to share data between C++ and Java codes. The methods are directly calling C Data Interface API.
Updated Java dataset codes to use the new API instead of passing buffer pointers over JNI.
This is also a dependency of ARROW-11776 (PR #10201).
Closes #10883 from zhztheplayer/ARROW-7272
Authored-by: Hongze Zhang <ho...@intel.com>
Signed-off-by: Antoine Pitrou <an...@python.org>
---
cpp/src/jni/dataset/jni_util.cc | 27 ++++-
cpp/src/jni/dataset/jni_util.h | 18 ++-
cpp/src/jni/dataset/jni_util_test.cc | 3 +-
cpp/src/jni/dataset/jni_wrapper.cc | 110 ++++-------------
java/dataset/CMakeLists.txt | 1 -
java/dataset/pom.xml | 18 ++-
.../org/apache/arrow/dataset/file/FileFormat.java | 2 +-
.../org/apache/arrow/dataset/jni/JniWrapper.java | 10 +-
.../arrow/dataset/jni/NativeRecordBatchHandle.java | 106 ----------------
.../apache/arrow/dataset/jni/NativeScanTask.java | 3 +-
.../apache/arrow/dataset/jni/NativeScanner.java | 133 ++++++++++-----------
.../org/apache/arrow/dataset/scanner/ScanTask.java | 15 +--
.../arrow/memory/NativeUnderlyingMemory.java | 81 -------------
.../java/org/apache/arrow/dataset/TestDataset.java | 30 ++++-
.../arrow/dataset/file/TestFileSystemDataset.java | 40 +++----
.../arrow/memory/TestNativeUnderlyingMemory.java | 110 -----------------
java/pom.xml | 1 +
17 files changed, 194 insertions(+), 514 deletions(-)
diff --git a/cpp/src/jni/dataset/jni_util.cc b/cpp/src/jni/dataset/jni_util.cc
index 113669a4cf..aea65e5ee2 100644
--- a/cpp/src/jni/dataset/jni_util.cc
+++ b/cpp/src/jni/dataset/jni_util.cc
@@ -17,10 +17,13 @@
#include "jni/dataset/jni_util.h"
-#include "arrow/util/logging.h"
-
+#include <memory>
#include <mutex>
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/util/logging.h"
+
namespace arrow {
namespace dataset {
namespace jni {
@@ -162,6 +165,15 @@ std::shared_ptr<ReservationListener> ReservationListenableMemoryPool::get_listen
ReservationListenableMemoryPool::~ReservationListenableMemoryPool() {}
+Status CheckException(JNIEnv* env) {
+ if (env->ExceptionCheck()) {
+ env->ExceptionDescribe();
+ env->ExceptionClear();
+ return Status::Invalid("Error during calling Java code from native code");
+ }
+ return Status::OK();
+}
+
jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
jclass local_class = env->FindClass(class_name);
jclass global_class = (jclass)env->NewGlobalRef(local_class);
@@ -236,6 +248,17 @@ arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(
env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT);
return schema;
}
+arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr<RecordBatch>& batch,
+ jlong struct_array) {
+ return arrow::ExportRecordBatch(*batch,
+ reinterpret_cast<struct ArrowArray*>(struct_array));
+}
+
+arrow::Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(
+ JNIEnv* env, const std::shared_ptr<Schema>& schema, jlong struct_array) {
+ return arrow::ImportRecordBatch(reinterpret_cast<struct ArrowArray*>(struct_array),
+ schema);
+}
} // namespace jni
} // namespace dataset
diff --git a/cpp/src/jni/dataset/jni_util.h b/cpp/src/jni/dataset/jni_util.h
index c76033ae63..552ce6f2aa 100644
--- a/cpp/src/jni/dataset/jni_util.h
+++ b/cpp/src/jni/dataset/jni_util.h
@@ -17,6 +17,8 @@
#pragma once
+#include <jni.h>
+
#include "arrow/array.h"
#include "arrow/io/api.h"
#include "arrow/ipc/api.h"
@@ -24,12 +26,12 @@
#include "arrow/result.h"
#include "arrow/type.h"
-#include <jni.h>
-
namespace arrow {
namespace dataset {
namespace jni {
+Status CheckException(JNIEnv* env);
+
jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name);
arrow::Result<jmethodID> GetMethodID(JNIEnv* env, jclass this_class, const char* name,
@@ -48,6 +50,18 @@ arrow::Result<jbyteArray> ToSchemaByteArray(JNIEnv* env,
arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(JNIEnv* env,
jbyteArray schemaBytes);
+/// \brief Export arrow::RecordBatch for Java (or other JVM languages) use.
+/// The exported batch is subject to C data interface specification and can be
+/// imported from Java side using provided JNI utilities.
+arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr<RecordBatch>& batch,
+ jlong struct_array);
+
+/// \brief Import arrow::RecordBatch from JVM language side. The input data should
+/// ideally be exported from specific JNI utilities from JVM language side and should
+/// conform to C data interface specification.
+arrow::Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(
+ JNIEnv* env, const std::shared_ptr<Schema>& schema, jlong struct_array);
+
/// \brief Create a new shared_ptr on heap from shared_ptr t to prevent
/// the managed object from being garbage-collected.
///
diff --git a/cpp/src/jni/dataset/jni_util_test.cc b/cpp/src/jni/dataset/jni_util_test.cc
index 589f00b1cc..eec1ad245a 100644
--- a/cpp/src/jni/dataset/jni_util_test.cc
+++ b/cpp/src/jni/dataset/jni_util_test.cc
@@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+#include "jni/dataset/jni_util.h"
+
#include <gtest/gtest.h>
#include "arrow/memory_pool.h"
#include "arrow/testing/gtest_util.h"
-#include "jni/dataset/jni_util.h"
namespace arrow {
namespace dataset {
diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc
index 041542804c..1e5c7a8aa7 100644
--- a/cpp/src/jni/dataset/jni_wrapper.cc
+++ b/cpp/src/jni/dataset/jni_wrapper.cc
@@ -24,9 +24,7 @@
#include "arrow/filesystem/localfs.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
-
#include "jni/dataset/jni_util.h"
-
#include "org_apache_arrow_dataset_file_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
@@ -37,14 +35,8 @@ jclass illegal_access_exception_class;
jclass illegal_argument_exception_class;
jclass runtime_exception_class;
-jclass record_batch_handle_class;
-jclass record_batch_handle_field_class;
-jclass record_batch_handle_buffer_class;
jclass java_reservation_listener_class;
-jmethodID record_batch_handle_constructor;
-jmethodID record_batch_handle_field_constructor;
-jmethodID record_batch_handle_buffer_constructor;
jmethodID reserve_memory_method;
jmethodID unreserve_memory_method;
@@ -100,11 +92,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
}
env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size);
- if (env->ExceptionCheck()) {
- env->ExceptionDescribe();
- env->ExceptionClear();
- return arrow::Status::Invalid("Error calling Java side reservation listener");
- }
+ RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
}
@@ -114,11 +102,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
return arrow::Status::Invalid("JNIEnv was not attached to current thread");
}
env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size);
- if (env->ExceptionCheck()) {
- env->ExceptionDescribe();
- env->ExceptionClear();
- return arrow::Status::Invalid("Error calling Java side reservation listener");
- }
+ RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
return arrow::Status::OK();
}
@@ -206,33 +190,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
runtime_exception_class =
CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;");
- record_batch_handle_class =
- CreateGlobalClassReference(env,
- "Lorg/apache/arrow/"
- "dataset/jni/NativeRecordBatchHandle;");
- record_batch_handle_field_class =
- CreateGlobalClassReference(env,
- "Lorg/apache/arrow/"
- "dataset/jni/NativeRecordBatchHandle$Field;");
- record_batch_handle_buffer_class =
- CreateGlobalClassReference(env,
- "Lorg/apache/arrow/"
- "dataset/jni/NativeRecordBatchHandle$Buffer;");
java_reservation_listener_class =
CreateGlobalClassReference(env,
"Lorg/apache/arrow/"
"dataset/jni/ReservationListener;");
-
- record_batch_handle_constructor =
- JniGetOrThrow(GetMethodID(env, record_batch_handle_class, "<init>",
- "(J[Lorg/apache/arrow/dataset/"
- "jni/NativeRecordBatchHandle$Field;"
- "[Lorg/apache/arrow/dataset/"
- "jni/NativeRecordBatchHandle$Buffer;)V"));
- record_batch_handle_field_constructor =
- JniGetOrThrow(GetMethodID(env, record_batch_handle_field_class, "<init>", "(JJ)V"));
- record_batch_handle_buffer_constructor = JniGetOrThrow(
- GetMethodID(env, record_batch_handle_buffer_class, "<init>", "(JJJJ)V"));
reserve_memory_method =
JniGetOrThrow(GetMethodID(env, java_reservation_listener_class, "reserve", "(J)V"));
unreserve_memory_method = JniGetOrThrow(
@@ -250,9 +211,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
env->DeleteGlobalRef(illegal_access_exception_class);
env->DeleteGlobalRef(illegal_argument_exception_class);
env->DeleteGlobalRef(runtime_exception_class);
- env->DeleteGlobalRef(record_batch_handle_class);
- env->DeleteGlobalRef(record_batch_handle_field_class);
- env->DeleteGlobalRef(record_batch_handle_buffer_class);
env->DeleteGlobalRef(java_reservation_listener_class);
default_memory_pool_id = -1L;
@@ -458,10 +416,10 @@ Java_org_apache_arrow_dataset_jni_JniWrapper_getSchemaFromScanner(JNIEnv* env, j
/*
* Class: org_apache_arrow_dataset_jni_JniWrapper
* Method: nextRecordBatch
- * Signature: (J)Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle;
+ * Signature: (JJ)Z
*/
-JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch(
- JNIEnv* env, jobject, jlong scanner_id) {
+JNIEXPORT jboolean JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch(
+ JNIEnv* env, jobject, jlong scanner_id, jlong struct_array) {
JNI_METHOD_START
std::shared_ptr<DisposableScannerAdaptor> scanner_adaptor =
RetrieveNativeInstance<DisposableScannerAdaptor>(scanner_id);
@@ -469,14 +427,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor
std::shared_ptr<arrow::RecordBatch> record_batch =
JniGetOrThrow(scanner_adaptor->Next());
if (record_batch == nullptr) {
- return nullptr; // stream ended
+ return false; // stream ended
}
- std::shared_ptr<arrow::Schema> schema = record_batch->schema();
- jobjectArray field_array =
- env->NewObjectArray(schema->num_fields(), record_batch_handle_field_class, nullptr);
-
- std::vector<std::shared_ptr<arrow::Buffer>> buffers;
- for (int i = 0; i < schema->num_fields(); ++i) {
+ std::vector<std::shared_ptr<arrow::Array>> offset_zeroed_arrays;
+ for (int i = 0; i < record_batch->num_columns(); ++i) {
// TODO: If the array has an offset then we need to de-offset the array
// in order for it to be properly consumed on the Java end.
// This forces a copy, it would be nice to avoid this if Java
@@ -485,44 +439,22 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor
//
// Generally a non-zero offset will occur whenever the scanner batch
// size is smaller than the batch size of the underlying files.
- auto column = record_batch->column(i);
- if (column->offset() != 0) {
- column = JniGetOrThrow(arrow::Concatenate({column}));
- }
- auto dataArray = column->data();
- jobject field = env->NewObject(record_batch_handle_field_class,
- record_batch_handle_field_constructor,
- column->length(), column->null_count());
- env->SetObjectArrayElement(field_array, i, field);
-
- for (auto& buffer : dataArray->buffers) {
- buffers.push_back(buffer);
+ std::shared_ptr<arrow::Array> array = record_batch->column(i);
+ if (array->offset() == 0) {
+ offset_zeroed_arrays.push_back(array);
+ continue;
}
+ std::shared_ptr<arrow::Array> offset_zeroed =
+ JniGetOrThrow(arrow::Concatenate({array}));
+ offset_zeroed_arrays.push_back(offset_zeroed);
}
- jobjectArray buffer_array =
- env->NewObjectArray(buffers.size(), record_batch_handle_buffer_class, nullptr);
-
- for (size_t j = 0; j < buffers.size(); ++j) {
- auto buffer = buffers[j];
- uint8_t* data = nullptr;
- int64_t size = 0;
- int64_t capacity = 0;
- if (buffer != nullptr) {
- data = (uint8_t*)buffer->data();
- size = buffer->size();
- capacity = buffer->capacity();
- }
- jobject buffer_handle = env->NewObject(record_batch_handle_buffer_class,
- record_batch_handle_buffer_constructor,
- CreateNativeRef(buffer), data, size, capacity);
- env->SetObjectArrayElement(buffer_array, j, buffer_handle);
- }
-
- jobject ret = env->NewObject(record_batch_handle_class, record_batch_handle_constructor,
- record_batch->num_rows(), field_array, buffer_array);
- return ret;
- JNI_METHOD_END(nullptr)
+ std::shared_ptr<arrow::RecordBatch> offset_zeroed_batch = arrow::RecordBatch::Make(
+ record_batch->schema(), record_batch->num_rows(), offset_zeroed_arrays);
+ JniAssertOkOrThrow(
+ arrow::dataset::jni::ExportRecordBatch(env, offset_zeroed_batch, struct_array));
+ return true;
+ JNI_METHOD_END(false)
}
/*
diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt
index 07e2d0ae8f..5b6e4a9ce2 100644
--- a/java/dataset/CMakeLists.txt
+++ b/java/dataset/CMakeLists.txt
@@ -33,7 +33,6 @@ message("generating headers to ${JNI_HEADERS_DIR}")
add_jar(arrow_dataset_java
src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
- src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml
index 9a80a547c1..c39f5e2896 100644
--- a/java/dataset/pom.xml
+++ b/java/dataset/pom.xml
@@ -44,6 +44,12 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-c-data</artifactId>
+ <version>${project.version}</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
@@ -56,6 +62,12 @@
<version>${parquet.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
@@ -86,12 +98,6 @@
</exclusion>
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- <version>${avro.version}</version>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
index e341d46bea..107fc2f71d 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
@@ -24,7 +24,7 @@ public enum FileFormat {
PARQUET(0),
NONE(-1);
- private int id;
+ private final int id;
FileFormat(int id) {
this.id = id;
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
index 7dd54e7648..e10eda0b45 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
@@ -87,21 +87,25 @@ public class JniWrapper {
/**
* Release the Scanner by destroying its reference held by JNI wrapper.
+ *
* @param scannerId the native pointer of the arrow::dataset::Scanner instance.
*/
public native void closeScanner(long scannerId);
/**
* Read next record batch from the specified scanner.
+ *
* @param scannerId the native pointer of the arrow::dataset::Scanner instance.
- * @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch.
+ * @param arrowArray pointer to an empty {@link org.apache.arrow.c.ArrowArray} struct to
+ * store C++ side record batch that conforms to C data interface.
+ * @return true if valid record batch is returned; false if stream ended.
*/
- public native NativeRecordBatchHandle nextRecordBatch(long scannerId);
+ public native boolean nextRecordBatch(long scannerId, long arrowArray);
/**
* Release the Buffer by destroying its reference held by JNI wrapper.
+ *
* @param bufferId the native pointer of the arrow::Buffer instance.
*/
public native void releaseBuffer(long bufferId);
-
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
deleted file mode 100644
index dd90fd1c1d..0000000000
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.dataset.jni;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Hold pointers to a Arrow C++ RecordBatch.
- */
-public class NativeRecordBatchHandle {
-
- private final long numRows;
- private final List<Field> fields;
- private final List<Buffer> buffers;
-
- /**
- * Constructor.
- *
- * @param numRows Total row number of the associated RecordBatch
- * @param fields Metadata of fields
- * @param buffers Retained Arrow buffers
- */
- public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) {
- this.numRows = numRows;
- this.fields = Arrays.asList(fields);
- this.buffers = Arrays.asList(buffers);
- }
-
- /**
- * Returns the total row number of the associated RecordBatch.
- * @return Total row number of the associated RecordBatch.
- */
- public long getNumRows() {
- return numRows;
- }
-
- /**
- * Returns Metadata of fields.
- * @return Metadata of fields.
- */
- public List<Field> getFields() {
- return fields;
- }
-
- /**
- * Returns the buffers.
- * @return Retained Arrow buffers.
- */
- public List<Buffer> getBuffers() {
- return buffers;
- }
-
- /**
- * Field metadata.
- */
- public static class Field {
- public final long length;
- public final long nullCount;
-
- public Field(long length, long nullCount) {
- this.length = length;
- this.nullCount = nullCount;
- }
- }
-
- /**
- * Pointers and metadata of the targeted Arrow buffer.
- */
- public static class Buffer {
- public final long nativeInstanceId;
- public final long memoryAddress;
- public final long size;
- public final long capacity;
-
- /**
- * Constructor.
- *
- * @param nativeInstanceId Native instance's id
- * @param memoryAddress Memory address of the first byte
- * @param size Size (in bytes)
- * @param capacity Capacity (in bytes)
- */
- public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) {
- this.nativeInstanceId = nativeInstanceId;
- this.memoryAddress = memoryAddress;
- this.size = size;
- this.capacity = capacity;
- }
- }
-}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
index 14d89c2ee7..e4764236da 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
@@ -18,6 +18,7 @@
package org.apache.arrow.dataset.jni;
import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.vector.ipc.ArrowReader;
/**
* Native implementation of {@link ScanTask}. Currently RecordBatches are iterated directly by the scanner
@@ -35,7 +36,7 @@ public class NativeScanTask implements ScanTask {
}
@Override
- public BatchIterator execute() {
+ public ArrowReader execute() {
return scanner.execute();
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
index 24c298067a..de18f9e5e0 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
@@ -18,23 +18,19 @@
package org.apache.arrow.dataset.jni;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.Data;
import org.apache.arrow.dataset.scanner.Scanner;
-import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.BufferLedger;
-import org.apache.arrow.memory.NativeUnderlyingMemory;
-import org.apache.arrow.memory.util.LargeMemoryUtil;
-import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.SchemaUtility;
@@ -60,7 +56,7 @@ public class NativeScanner implements Scanner {
this.scannerId = scannerId;
}
- ScanTask.BatchIterator execute() {
+ ArrowReader execute() {
if (closed) {
throw new NativeInstanceReleasedException();
}
@@ -68,67 +64,7 @@ public class NativeScanner implements Scanner {
throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " +
"new scanner instead");
}
- return new ScanTask.BatchIterator() {
- private ArrowRecordBatch peek = null;
-
- @Override
- public void close() {
- NativeScanner.this.close();
- }
-
- @Override
- public boolean hasNext() {
- if (peek != null) {
- return true;
- }
- final NativeRecordBatchHandle handle;
- readLock.lock();
- try {
- if (closed) {
- throw new NativeInstanceReleasedException();
- }
- handle = JniWrapper.get().nextRecordBatch(scannerId);
- } finally {
- readLock.unlock();
- }
- if (handle == null) {
- return false;
- }
- final ArrayList<ArrowBuf> buffers = new ArrayList<>();
- for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) {
- final BufferAllocator allocator = context.getAllocator();
- final int size = LargeMemoryUtil.checkedCastToInt(buffer.size);
- final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
- size, buffer.nativeInstanceId, buffer.memoryAddress);
- BufferLedger ledger = am.associate(allocator);
- ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress);
- buffers.add(buf);
- }
-
- try {
- final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows());
- peek = new ArrowRecordBatch(numRows, handle.getFields().stream()
- .map(field -> new ArrowFieldNode(field.length, field.nullCount))
- .collect(Collectors.toList()), buffers);
- return true;
- } finally {
- buffers.forEach(buffer -> buffer.getReferenceManager().release());
- }
- }
-
- @Override
- public ArrowRecordBatch next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- try {
- return peek;
- } finally {
- peek = null;
- }
- }
- };
+ return new NativeReader(context.getAllocator());
}
@Override
@@ -167,4 +103,59 @@ public class NativeScanner implements Scanner {
writeLock.unlock();
}
}
+
+ /**
+ * {@link ArrowReader} implementation for NativeDataset.
+ */
+ public class NativeReader extends ArrowReader {
+
+ private NativeReader(BufferAllocator allocator) {
+ super(allocator);
+ }
+
+ @Override
+ protected void loadRecordBatch(ArrowRecordBatch batch) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean loadNextBatch() throws IOException {
+ readLock.lock();
+ try {
+ if (closed) {
+ throw new NativeInstanceReleasedException();
+ }
+ try (ArrowArray arrowArray = ArrowArray.allocateNew(context.getAllocator())) {
+ if (!JniWrapper.get().nextRecordBatch(scannerId, arrowArray.memoryAddress())) {
+ return false;
+ }
+ final VectorSchemaRoot vsr = getVectorSchemaRoot();
+ Data.importIntoVectorSchemaRoot(context.getAllocator(), arrowArray, vsr, this);
+ }
+ } finally {
+ readLock.unlock();
+ }
+ return true;
+ }
+
+ @Override
+ public long bytesRead() {
+ return 0L;
+ }
+
+ @Override
+ protected void closeReadSource() throws IOException {
+ // no-op
+ }
+
+ @Override
+ protected Schema readSchema() throws IOException {
+ return schema();
+ }
+ }
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
index d07036a61e..434f5c9a6f 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
@@ -17,9 +17,9 @@
package org.apache.arrow.dataset.scanner;
-import java.util.Iterator;
+import java.io.Reader;
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.ArrowReader;
/**
* Read record batches from a range of a single data fragment. A
@@ -29,14 +29,7 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
public interface ScanTask extends AutoCloseable {
/**
- * Creates and returns a {@link BatchIterator} instance.
+ * Execute this ScanTask and return a {@link Reader} instance.
*/
- BatchIterator execute();
-
- /**
- * The iterator implementation for {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}s.
- */
- interface BatchIterator extends Iterator<ArrowRecordBatch>, AutoCloseable {
-
- }
+ ArrowReader execute();
}
diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
deleted file mode 100644
index 963fb61704..0000000000
--- a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import org.apache.arrow.dataset.jni.JniWrapper;
-
-/**
- * AllocationManager implementation for native allocated memory.
- */
-public class NativeUnderlyingMemory extends AllocationManager {
-
- private final int size;
- private final long nativeInstanceId;
- private final long address;
-
- /**
- * Constructor.
- *
- * @param accountingAllocator The accounting allocator instance
- * @param size Size of underlying memory (in bytes)
- * @param nativeInstanceId ID of the native instance
- */
- NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) {
- super(accountingAllocator);
- this.size = size;
- this.nativeInstanceId = nativeInstanceId;
- this.address = address;
- // pre-allocate bytes on accounting allocator
- final AllocationListener listener = accountingAllocator.getListener();
- try (final AllocationReservation reservation = accountingAllocator.newReservation()) {
- listener.onPreAllocation(size);
- reservation.reserve(size);
- listener.onAllocation(size);
- } catch (Exception e) {
- release0();
- throw e;
- }
- }
-
- /**
- * Alias to constructor.
- */
- public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId,
- long address) {
- return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address);
- }
-
- public BufferLedger associate(BufferAllocator allocator) {
- return super.associate(allocator);
- }
-
- @Override
- protected void release0() {
- JniWrapper.get().releaseBuffer(nativeInstanceId);
- }
-
- @Override
- public long getSize() {
- return size;
- }
-
- @Override
- protected long memoryAddress() {
- return address;
- }
-}
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
index 51dac15e56..15224534d2 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
@@ -17,6 +17,8 @@
package org.apache.arrow.dataset;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
@@ -26,11 +28,15 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.After;
@@ -56,15 +62,31 @@ public abstract class TestDataset {
protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory, ScanOptions options) {
final Dataset dataset = factory.finish();
final Scanner scanner = dataset.newScan(options);
- final List<ArrowRecordBatch> ret = stream(scanner.scan())
- .flatMap(t -> stream(t.execute()))
- .collect(Collectors.toList());
try {
+ final List<ArrowRecordBatch> ret = stream(scanner.scan())
+ .flatMap(t -> stream(collectTaskData(t)))
+ .collect(Collectors.toList());
AutoCloseables.close(scanner, dataset);
+ return ret;
+ } catch (RuntimeException e) {
+ throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
- return ret;
+ }
+
+ protected List<ArrowRecordBatch> collectTaskData(ScanTask scanTask) {
+ try (ArrowReader reader = scanTask.execute()) {
+ List<ArrowRecordBatch> batches = new ArrayList<>();
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ final VectorUnloader unloader = new VectorUnloader(root);
+ batches.add(unloader.getRecordBatch());
+ }
+ return batches;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
protected Schema inferResultSchemaFromFactory(DatasetFactory factory, ScanOptions options) {
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
index 83d57c7421..92610b1145 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
@@ -44,11 +44,11 @@ import org.apache.arrow.dataset.jni.NativeScanTask;
import org.apache.arrow.dataset.jni.NativeScanner;
import org.apache.arrow.dataset.jni.TestNativeDataset;
import org.apache.arrow.dataset.scanner.ScanOptions;
-import org.apache.arrow.dataset.scanner.ScanTask;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Schema;
@@ -90,6 +90,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
AutoCloseables.close(datum);
+ AutoCloseables.close(factory);
}
@Test
@@ -116,6 +117,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
.build()), datum);
AutoCloseables.close(datum);
+ AutoCloseables.close(factory);
}
@Test
@@ -135,6 +137,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
AutoCloseables.close(datum);
+ AutoCloseables.close(factory);
}
@Test
@@ -217,6 +220,8 @@ public class TestFileSystemDataset extends TestNativeDataset {
dataset.close();
dataset.close();
});
+
+ AutoCloseables.close(factory);
}
@Test
@@ -232,13 +237,14 @@ public class TestFileSystemDataset extends TestNativeDataset {
List<? extends NativeScanTask> taskList2 = collect(scanner.scan());
NativeScanTask task1 = taskList1.get(0);
NativeScanTask task2 = taskList2.get(0);
- List<ArrowRecordBatch> datum = collect(task1.execute());
+ List<ArrowRecordBatch> datum = collectTaskData(task1);
+
+ AutoCloseables.close(datum);
UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute);
Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead",
uoe.getMessage());
- AutoCloseables.close(datum);
AutoCloseables.close(taskList1);
AutoCloseables.close(taskList2);
AutoCloseables.close(scanner, dataset, factory);
@@ -256,7 +262,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> taskList = collect(scanner.scan());
NativeScanTask task = taskList.get(0);
- List<ArrowRecordBatch> datum = executor.submit(() -> collect(task.execute())).get();
+ List<ArrowRecordBatch> datum = executor.submit(() -> collectTaskData(task)).get();
AutoCloseables.close(datum);
AutoCloseables.close(taskList);
@@ -274,6 +280,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
NativeScanner scanner = dataset.newScan(options);
scanner.close();
assertThrows(NativeInstanceReleasedException.class, scanner::scan);
+ AutoCloseables.close(factory);
}
@Test
@@ -289,6 +296,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
NativeScanTask task = tasks.get(0);
task.close();
assertThrows(NativeInstanceReleasedException.class, task::execute);
+ AutoCloseables.close(factory);
}
@Test
@@ -302,28 +310,10 @@ public class TestFileSystemDataset extends TestNativeDataset {
NativeScanner scanner = dataset.newScan(options);
List<? extends NativeScanTask> tasks = collect(scanner.scan());
NativeScanTask task = tasks.get(0);
- ScanTask.BatchIterator iterator = task.execute();
+ ArrowReader reader = task.execute();
task.close();
- assertThrows(NativeInstanceReleasedException.class, iterator::hasNext);
- }
-
- @Test
- public void testMemoryAllocationOnAssociatedAllocator() throws Exception {
- ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
- FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
- FileFormat.PARQUET, writeSupport.getOutputURI());
- ScanOptions options = new ScanOptions(100);
- long initReservation = rootAllocator().getAllocatedMemory();
- List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
- final long expected_diff = datum.stream()
- .flatMapToLong(batch -> batch.getBuffers()
- .stream()
- .mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum();
- long reservation = rootAllocator().getAllocatedMemory();
- AutoCloseables.close(datum);
- long finalReservation = rootAllocator().getAllocatedMemory();
- Assert.assertEquals(expected_diff, reservation - initReservation);
- Assert.assertEquals(-expected_diff, finalReservation - reservation);
+ assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch);
+ AutoCloseables.close(factory);
}
private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
diff --git a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
deleted file mode 100644
index c81868e42b..0000000000
--- a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestNativeUnderlyingMemory {
-
- private RootAllocator allocator = null;
-
- @Before
- public void setUp() {
- allocator = new RootAllocator(Long.MAX_VALUE);
- }
-
- @After
- public void tearDown() {
- allocator.close();
- }
-
- protected RootAllocator rootAllocator() {
- return allocator;
- }
-
- @Test
- public void testReservation() {
- final RootAllocator root = rootAllocator();
-
- final int size = 512;
- final AllocationManager am = new MockUnderlyingMemory(root, size);
- final BufferLedger ledger = am.associate(root);
-
- assertEquals(size, root.getAllocatedMemory());
-
- ledger.release();
- }
-
- @Test
- public void testBufferTransfer() {
- final RootAllocator root = rootAllocator();
-
- ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
- ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
- assertEquals(0, allocator1.getAllocatedMemory());
- assertEquals(0, allocator2.getAllocatedMemory());
-
- final int size = 512;
- final AllocationManager am = new MockUnderlyingMemory(allocator1, size);
-
- final BufferLedger owningLedger = am.associate(allocator1);
- assertEquals(size, owningLedger.getAccountedSize());
- assertEquals(size, owningLedger.getSize());
- assertEquals(size, allocator1.getAllocatedMemory());
-
- final BufferLedger transferredLedger = am.associate(allocator2);
- owningLedger.release(); // release previous owner
- assertEquals(0, owningLedger.getAccountedSize());
- assertEquals(size, owningLedger.getSize());
- assertEquals(size, transferredLedger.getAccountedSize());
- assertEquals(size, transferredLedger.getSize());
- assertEquals(0, allocator1.getAllocatedMemory());
- assertEquals(size, allocator2.getAllocatedMemory());
-
- transferredLedger.release();
- allocator1.close();
- allocator2.close();
- }
-
- /**
- * A mock class of {@link NativeUnderlyingMemory} for unit testing about size-related operations.
- */
- private static class MockUnderlyingMemory extends NativeUnderlyingMemory {
-
- /**
- * Constructor.
- */
- MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) {
- super(accountingAllocator, size, -1L, -1L);
- }
-
- @Override
- protected void release0() {
- System.out.println("Underlying memory released. Size: " + getSize());
- }
-
- @Override
- protected long memoryAddress() {
- throw new UnsupportedOperationException();
- }
- }
-}
diff --git a/java/pom.xml b/java/pom.xml
index edcaac6b87..c74023c9e7 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -716,6 +716,7 @@
<module>adapter/orc</module>
<module>gandiva</module>
<module>dataset</module>
+ <module>c</module>
</modules>
</profile>