You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2022/04/26 16:11:18 UTC

[arrow] branch master updated: ARROW-7272: [C++][Java][Dataset] JNI bridge between RecordBatch and VectorSchemaRoot

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new dc97883dee ARROW-7272: [C++][Java][Dataset] JNI bridge between RecordBatch and VectorSchemaRoot
dc97883dee is described below

commit dc97883dee25ba8da55c7591060c44de2ea00865
Author: Hongze Zhang <ho...@intel.com>
AuthorDate: Tue Apr 26 18:11:11 2022 +0200

    ARROW-7272: [C++][Java][Dataset] JNI bridge between RecordBatch and VectorSchemaRoot
    
    Added simple utility API to share data between C++ and Java codes. The methods are directly calling C Data Interface API.
    
    Updated Java dataset codes to use the new API instead of passing buffer pointers over JNI.
    
    This is also a dependency of ARROW-11776 (PR #10201).
    
    Closes #10883 from zhztheplayer/ARROW-7272
    
    Authored-by: Hongze Zhang <ho...@intel.com>
    Signed-off-by: Antoine Pitrou <an...@python.org>
---
 cpp/src/jni/dataset/jni_util.cc                    |  27 ++++-
 cpp/src/jni/dataset/jni_util.h                     |  18 ++-
 cpp/src/jni/dataset/jni_util_test.cc               |   3 +-
 cpp/src/jni/dataset/jni_wrapper.cc                 | 110 ++++-------------
 java/dataset/CMakeLists.txt                        |   1 -
 java/dataset/pom.xml                               |  18 ++-
 .../org/apache/arrow/dataset/file/FileFormat.java  |   2 +-
 .../org/apache/arrow/dataset/jni/JniWrapper.java   |  10 +-
 .../arrow/dataset/jni/NativeRecordBatchHandle.java | 106 ----------------
 .../apache/arrow/dataset/jni/NativeScanTask.java   |   3 +-
 .../apache/arrow/dataset/jni/NativeScanner.java    | 133 ++++++++++-----------
 .../org/apache/arrow/dataset/scanner/ScanTask.java |  15 +--
 .../arrow/memory/NativeUnderlyingMemory.java       |  81 -------------
 .../java/org/apache/arrow/dataset/TestDataset.java |  30 ++++-
 .../arrow/dataset/file/TestFileSystemDataset.java  |  40 +++----
 .../arrow/memory/TestNativeUnderlyingMemory.java   | 110 -----------------
 java/pom.xml                                       |   1 +
 17 files changed, 194 insertions(+), 514 deletions(-)

diff --git a/cpp/src/jni/dataset/jni_util.cc b/cpp/src/jni/dataset/jni_util.cc
index 113669a4cf..aea65e5ee2 100644
--- a/cpp/src/jni/dataset/jni_util.cc
+++ b/cpp/src/jni/dataset/jni_util.cc
@@ -17,10 +17,13 @@
 
 #include "jni/dataset/jni_util.h"
 
-#include "arrow/util/logging.h"
-
+#include <memory>
 #include <mutex>
 
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "arrow/util/logging.h"
+
 namespace arrow {
 namespace dataset {
 namespace jni {
@@ -162,6 +165,15 @@ std::shared_ptr<ReservationListener> ReservationListenableMemoryPool::get_listen
 
 ReservationListenableMemoryPool::~ReservationListenableMemoryPool() {}
 
+Status CheckException(JNIEnv* env) {
+  if (env->ExceptionCheck()) {
+    env->ExceptionDescribe();
+    env->ExceptionClear();
+    return Status::Invalid("Error during calling Java code from native code");
+  }
+  return Status::OK();
+}
+
 jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) {
   jclass local_class = env->FindClass(class_name);
   jclass global_class = (jclass)env->NewGlobalRef(local_class);
@@ -236,6 +248,17 @@ arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(
   env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT);
   return schema;
 }
+arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr<RecordBatch>& batch,
+                                jlong struct_array) {
+  return arrow::ExportRecordBatch(*batch,
+                                  reinterpret_cast<struct ArrowArray*>(struct_array));
+}
+
+arrow::Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(
+    JNIEnv* env, const std::shared_ptr<Schema>& schema, jlong struct_array) {
+  return arrow::ImportRecordBatch(reinterpret_cast<struct ArrowArray*>(struct_array),
+                                  schema);
+}
 
 }  // namespace jni
 }  // namespace dataset
diff --git a/cpp/src/jni/dataset/jni_util.h b/cpp/src/jni/dataset/jni_util.h
index c76033ae63..552ce6f2aa 100644
--- a/cpp/src/jni/dataset/jni_util.h
+++ b/cpp/src/jni/dataset/jni_util.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <jni.h>
+
 #include "arrow/array.h"
 #include "arrow/io/api.h"
 #include "arrow/ipc/api.h"
@@ -24,12 +26,12 @@
 #include "arrow/result.h"
 #include "arrow/type.h"
 
-#include <jni.h>
-
 namespace arrow {
 namespace dataset {
 namespace jni {
 
+Status CheckException(JNIEnv* env);
+
 jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name);
 
 arrow::Result<jmethodID> GetMethodID(JNIEnv* env, jclass this_class, const char* name,
@@ -48,6 +50,18 @@ arrow::Result<jbyteArray> ToSchemaByteArray(JNIEnv* env,
 arrow::Result<std::shared_ptr<arrow::Schema>> FromSchemaByteArray(JNIEnv* env,
                                                                   jbyteArray schemaBytes);
 
+/// \brief Export arrow::RecordBatch for Java (or other JVM languages) use.
+/// The exported batch is subject to C data interface specification and can be
+/// imported from Java side using provided JNI utilities.
+arrow::Status ExportRecordBatch(JNIEnv* env, const std::shared_ptr<RecordBatch>& batch,
+                                jlong struct_array);
+
+/// \brief Import arrow::RecordBatch from JVM language side. The input data should
+/// ideally be exported from specific JNI utilities from JVM language side and should
+/// conform to C data interface specification.
+arrow::Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(
+    JNIEnv* env, const std::shared_ptr<Schema>& schema, jlong struct_array);
+
 /// \brief Create a new shared_ptr on heap from shared_ptr t to prevent
 /// the managed object from being garbage-collected.
 ///
diff --git a/cpp/src/jni/dataset/jni_util_test.cc b/cpp/src/jni/dataset/jni_util_test.cc
index 589f00b1cc..eec1ad245a 100644
--- a/cpp/src/jni/dataset/jni_util_test.cc
+++ b/cpp/src/jni/dataset/jni_util_test.cc
@@ -15,11 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "jni/dataset/jni_util.h"
+
 #include <gtest/gtest.h>
 
 #include "arrow/memory_pool.h"
 #include "arrow/testing/gtest_util.h"
-#include "jni/dataset/jni_util.h"
 
 namespace arrow {
 namespace dataset {
diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc
index 041542804c..1e5c7a8aa7 100644
--- a/cpp/src/jni/dataset/jni_wrapper.cc
+++ b/cpp/src/jni/dataset/jni_wrapper.cc
@@ -24,9 +24,7 @@
 #include "arrow/filesystem/localfs.h"
 #include "arrow/ipc/api.h"
 #include "arrow/util/iterator.h"
-
 #include "jni/dataset/jni_util.h"
-
 #include "org_apache_arrow_dataset_file_JniWrapper.h"
 #include "org_apache_arrow_dataset_jni_JniWrapper.h"
 #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
@@ -37,14 +35,8 @@ jclass illegal_access_exception_class;
 jclass illegal_argument_exception_class;
 jclass runtime_exception_class;
 
-jclass record_batch_handle_class;
-jclass record_batch_handle_field_class;
-jclass record_batch_handle_buffer_class;
 jclass java_reservation_listener_class;
 
-jmethodID record_batch_handle_constructor;
-jmethodID record_batch_handle_field_constructor;
-jmethodID record_batch_handle_buffer_constructor;
 jmethodID reserve_memory_method;
 jmethodID unreserve_memory_method;
 
@@ -100,11 +92,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
       return arrow::Status::Invalid("JNIEnv was not attached to current thread");
     }
     env->CallObjectMethod(java_reservation_listener_, reserve_memory_method, size);
-    if (env->ExceptionCheck()) {
-      env->ExceptionDescribe();
-      env->ExceptionClear();
-      return arrow::Status::Invalid("Error calling Java side reservation listener");
-    }
+    RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
     return arrow::Status::OK();
   }
 
@@ -114,11 +102,7 @@ class ReserveFromJava : public arrow::dataset::jni::ReservationListener {
       return arrow::Status::Invalid("JNIEnv was not attached to current thread");
     }
     env->CallObjectMethod(java_reservation_listener_, unreserve_memory_method, size);
-    if (env->ExceptionCheck()) {
-      env->ExceptionDescribe();
-      env->ExceptionClear();
-      return arrow::Status::Invalid("Error calling Java side reservation listener");
-    }
+    RETURN_NOT_OK(arrow::dataset::jni::CheckException(env));
     return arrow::Status::OK();
   }
 
@@ -206,33 +190,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
   runtime_exception_class =
       CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;");
 
-  record_batch_handle_class =
-      CreateGlobalClassReference(env,
-                                 "Lorg/apache/arrow/"
-                                 "dataset/jni/NativeRecordBatchHandle;");
-  record_batch_handle_field_class =
-      CreateGlobalClassReference(env,
-                                 "Lorg/apache/arrow/"
-                                 "dataset/jni/NativeRecordBatchHandle$Field;");
-  record_batch_handle_buffer_class =
-      CreateGlobalClassReference(env,
-                                 "Lorg/apache/arrow/"
-                                 "dataset/jni/NativeRecordBatchHandle$Buffer;");
   java_reservation_listener_class =
       CreateGlobalClassReference(env,
                                  "Lorg/apache/arrow/"
                                  "dataset/jni/ReservationListener;");
-
-  record_batch_handle_constructor =
-      JniGetOrThrow(GetMethodID(env, record_batch_handle_class, "<init>",
-                                "(J[Lorg/apache/arrow/dataset/"
-                                "jni/NativeRecordBatchHandle$Field;"
-                                "[Lorg/apache/arrow/dataset/"
-                                "jni/NativeRecordBatchHandle$Buffer;)V"));
-  record_batch_handle_field_constructor =
-      JniGetOrThrow(GetMethodID(env, record_batch_handle_field_class, "<init>", "(JJ)V"));
-  record_batch_handle_buffer_constructor = JniGetOrThrow(
-      GetMethodID(env, record_batch_handle_buffer_class, "<init>", "(JJJJ)V"));
   reserve_memory_method =
       JniGetOrThrow(GetMethodID(env, java_reservation_listener_class, "reserve", "(J)V"));
   unreserve_memory_method = JniGetOrThrow(
@@ -250,9 +211,6 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   env->DeleteGlobalRef(illegal_access_exception_class);
   env->DeleteGlobalRef(illegal_argument_exception_class);
   env->DeleteGlobalRef(runtime_exception_class);
-  env->DeleteGlobalRef(record_batch_handle_class);
-  env->DeleteGlobalRef(record_batch_handle_field_class);
-  env->DeleteGlobalRef(record_batch_handle_buffer_class);
   env->DeleteGlobalRef(java_reservation_listener_class);
 
   default_memory_pool_id = -1L;
@@ -458,10 +416,10 @@ Java_org_apache_arrow_dataset_jni_JniWrapper_getSchemaFromScanner(JNIEnv* env, j
 /*
  * Class:     org_apache_arrow_dataset_jni_JniWrapper
  * Method:    nextRecordBatch
- * Signature: (J)Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle;
+ * Signature: (JJ)Z
  */
-JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch(
-    JNIEnv* env, jobject, jlong scanner_id) {
+JNIEXPORT jboolean JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch(
+    JNIEnv* env, jobject, jlong scanner_id, jlong struct_array) {
   JNI_METHOD_START
   std::shared_ptr<DisposableScannerAdaptor> scanner_adaptor =
       RetrieveNativeInstance<DisposableScannerAdaptor>(scanner_id);
@@ -469,14 +427,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor
   std::shared_ptr<arrow::RecordBatch> record_batch =
       JniGetOrThrow(scanner_adaptor->Next());
   if (record_batch == nullptr) {
-    return nullptr;  // stream ended
+    return false;  // stream ended
   }
-  std::shared_ptr<arrow::Schema> schema = record_batch->schema();
-  jobjectArray field_array =
-      env->NewObjectArray(schema->num_fields(), record_batch_handle_field_class, nullptr);
-
-  std::vector<std::shared_ptr<arrow::Buffer>> buffers;
-  for (int i = 0; i < schema->num_fields(); ++i) {
+  std::vector<std::shared_ptr<arrow::Array>> offset_zeroed_arrays;
+  for (int i = 0; i < record_batch->num_columns(); ++i) {
     // TODO: If the array has an offset then we need to de-offset the array
     // in order for it to be properly consumed on the Java end.
     // This forces a copy, it would be nice to avoid this if Java
@@ -485,44 +439,22 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor
     //
     // Generally a non-zero offset will occur whenever the scanner batch
     // size is smaller than the batch size of the underlying files.
-    auto column = record_batch->column(i);
-    if (column->offset() != 0) {
-      column = JniGetOrThrow(arrow::Concatenate({column}));
-    }
-    auto dataArray = column->data();
-    jobject field = env->NewObject(record_batch_handle_field_class,
-                                   record_batch_handle_field_constructor,
-                                   column->length(), column->null_count());
-    env->SetObjectArrayElement(field_array, i, field);
-
-    for (auto& buffer : dataArray->buffers) {
-      buffers.push_back(buffer);
+    std::shared_ptr<arrow::Array> array = record_batch->column(i);
+    if (array->offset() == 0) {
+      offset_zeroed_arrays.push_back(array);
+      continue;
     }
+    std::shared_ptr<arrow::Array> offset_zeroed =
+        JniGetOrThrow(arrow::Concatenate({array}));
+    offset_zeroed_arrays.push_back(offset_zeroed);
   }
 
-  jobjectArray buffer_array =
-      env->NewObjectArray(buffers.size(), record_batch_handle_buffer_class, nullptr);
-
-  for (size_t j = 0; j < buffers.size(); ++j) {
-    auto buffer = buffers[j];
-    uint8_t* data = nullptr;
-    int64_t size = 0;
-    int64_t capacity = 0;
-    if (buffer != nullptr) {
-      data = (uint8_t*)buffer->data();
-      size = buffer->size();
-      capacity = buffer->capacity();
-    }
-    jobject buffer_handle = env->NewObject(record_batch_handle_buffer_class,
-                                           record_batch_handle_buffer_constructor,
-                                           CreateNativeRef(buffer), data, size, capacity);
-    env->SetObjectArrayElement(buffer_array, j, buffer_handle);
-  }
-
-  jobject ret = env->NewObject(record_batch_handle_class, record_batch_handle_constructor,
-                               record_batch->num_rows(), field_array, buffer_array);
-  return ret;
-  JNI_METHOD_END(nullptr)
+  std::shared_ptr<arrow::RecordBatch> offset_zeroed_batch = arrow::RecordBatch::Make(
+      record_batch->schema(), record_batch->num_rows(), offset_zeroed_arrays);
+  JniAssertOkOrThrow(
+      arrow::dataset::jni::ExportRecordBatch(env, offset_zeroed_batch, struct_array));
+  return true;
+  JNI_METHOD_END(false)
 }
 
 /*
diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt
index 07e2d0ae8f..5b6e4a9ce2 100644
--- a/java/dataset/CMakeLists.txt
+++ b/java/dataset/CMakeLists.txt
@@ -33,7 +33,6 @@ message("generating headers to ${JNI_HEADERS_DIR}")
 add_jar(arrow_dataset_java
         src/main/java/org/apache/arrow/dataset/jni/JniLoader.java
         src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
-        src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
         src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
         src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
         src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml
index 9a80a547c1..c39f5e2896 100644
--- a/java/dataset/pom.xml
+++ b/java/dataset/pom.xml
@@ -44,6 +44,12 @@
             <version>${project.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-c-data</artifactId>
+            <version>${project.version}</version>
+            <scope>compile</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.arrow</groupId>
             <artifactId>arrow-memory-netty</artifactId>
@@ -56,6 +62,12 @@
             <version>${parquet.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.parquet</groupId>
             <artifactId>parquet-hadoop</artifactId>
@@ -86,12 +98,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.avro</groupId>
-            <artifactId>avro</artifactId>
-            <version>${avro.version}</version>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
index e341d46bea..107fc2f71d 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java
@@ -24,7 +24,7 @@ public enum FileFormat {
   PARQUET(0),
   NONE(-1);
 
-  private int id;
+  private final int id;
 
   FileFormat(int id) {
     this.id = id;
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
index 7dd54e7648..e10eda0b45 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java
@@ -87,21 +87,25 @@ public class JniWrapper {
 
   /**
    * Release the Scanner by destroying its reference held by JNI wrapper.
+   *
    * @param scannerId the native pointer of the arrow::dataset::Scanner instance.
    */
   public native void closeScanner(long scannerId);
 
   /**
    * Read next record batch from the specified scanner.
+   *
    * @param scannerId the native pointer of the arrow::dataset::Scanner instance.
-   * @return an instance of {@link NativeRecordBatchHandle} describing the overall layout of the native record batch.
+   * @param arrowArray pointer to an empty {@link org.apache.arrow.c.ArrowArray} struct to
+   *                    store C++ side record batch that conforms to C data interface.
+   * @return true if valid record batch is returned; false if stream ended.
    */
-  public native NativeRecordBatchHandle nextRecordBatch(long scannerId);
+  public native boolean nextRecordBatch(long scannerId, long arrowArray);
 
   /**
    * Release the Buffer by destroying its reference held by JNI wrapper.
+   *
    * @param bufferId the native pointer of the arrow::Buffer instance.
    */
   public native void releaseBuffer(long bufferId);
-
 }
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
deleted file mode 100644
index dd90fd1c1d..0000000000
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.dataset.jni;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Hold pointers to a Arrow C++ RecordBatch.
- */
-public class NativeRecordBatchHandle {
-
-  private final long numRows;
-  private final List<Field> fields;
-  private final List<Buffer> buffers;
-
-  /**
-   * Constructor.
-   *
-   * @param numRows Total row number of the associated RecordBatch
-   * @param fields Metadata of fields
-   * @param buffers Retained Arrow buffers
-   */
-  public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) {
-    this.numRows = numRows;
-    this.fields = Arrays.asList(fields);
-    this.buffers = Arrays.asList(buffers);
-  }
-
-  /**
-   * Returns the total row number of the associated RecordBatch.
-   * @return Total row number of the associated RecordBatch.
-   */
-  public long getNumRows() {
-    return numRows;
-  }
-
-  /**
-   * Returns Metadata of fields.
-   * @return Metadata of fields.
-   */
-  public List<Field> getFields() {
-    return fields;
-  }
-
-  /**
-   * Returns the buffers.
-   * @return Retained Arrow buffers.
-   */
-  public List<Buffer> getBuffers() {
-    return buffers;
-  }
-
-  /**
-   * Field metadata.
-   */
-  public static class Field {
-    public final long length;
-    public final long nullCount;
-
-    public Field(long length, long nullCount) {
-      this.length = length;
-      this.nullCount = nullCount;
-    }
-  }
-
-  /**
-   * Pointers and metadata of the targeted Arrow buffer.
-   */
-  public static class Buffer {
-    public final long nativeInstanceId;
-    public final long memoryAddress;
-    public final long size;
-    public final long capacity;
-
-    /**
-     * Constructor.
-     *
-     * @param nativeInstanceId Native instance's id
-     * @param memoryAddress Memory address of the first byte
-     * @param size Size (in bytes)
-     * @param capacity Capacity (in bytes)
-     */
-    public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) {
-      this.nativeInstanceId = nativeInstanceId;
-      this.memoryAddress = memoryAddress;
-      this.size = size;
-      this.capacity = capacity;
-    }
-  }
-}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
index 14d89c2ee7..e4764236da 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java
@@ -18,6 +18,7 @@
 package org.apache.arrow.dataset.jni;
 
 import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.vector.ipc.ArrowReader;
 
 /**
  * Native implementation of {@link ScanTask}. Currently RecordBatches are iterated directly by the scanner
@@ -35,7 +36,7 @@ public class NativeScanTask implements ScanTask {
   }
 
   @Override
-  public BatchIterator execute() {
+  public ArrowReader execute() {
     return scanner.execute();
   }
 
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
index 24c298067a..de18f9e5e0 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java
@@ -18,23 +18,19 @@
 package org.apache.arrow.dataset.jni;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
-import org.apache.arrow.dataset.scanner.ScanTask;
+import org.apache.arrow.c.ArrowArray;
+import org.apache.arrow.c.Data;
 import org.apache.arrow.dataset.scanner.Scanner;
-import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.BufferLedger;
-import org.apache.arrow.memory.NativeUnderlyingMemory;
-import org.apache.arrow.memory.util.LargeMemoryUtil;
-import org.apache.arrow.vector.ipc.message.ArrowFieldNode;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.apache.arrow.vector.util.SchemaUtility;
@@ -60,7 +56,7 @@ public class NativeScanner implements Scanner {
     this.scannerId = scannerId;
   }
 
-  ScanTask.BatchIterator execute() {
+  ArrowReader execute() {
     if (closed) {
       throw new NativeInstanceReleasedException();
     }
@@ -68,67 +64,7 @@ public class NativeScanner implements Scanner {
       throw new UnsupportedOperationException("NativeScanner cannot be executed more than once. Consider creating " +
           "new scanner instead");
     }
-    return new ScanTask.BatchIterator() {
-      private ArrowRecordBatch peek = null;
-
-      @Override
-      public void close() {
-        NativeScanner.this.close();
-      }
-
-      @Override
-      public boolean hasNext() {
-        if (peek != null) {
-          return true;
-        }
-        final NativeRecordBatchHandle handle;
-        readLock.lock();
-        try {
-          if (closed) {
-            throw new NativeInstanceReleasedException();
-          }
-          handle = JniWrapper.get().nextRecordBatch(scannerId);
-        } finally {
-          readLock.unlock();
-        }
-        if (handle == null) {
-          return false;
-        }
-        final ArrayList<ArrowBuf> buffers = new ArrayList<>();
-        for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) {
-          final BufferAllocator allocator = context.getAllocator();
-          final int size = LargeMemoryUtil.checkedCastToInt(buffer.size);
-          final NativeUnderlyingMemory am = NativeUnderlyingMemory.create(allocator,
-              size, buffer.nativeInstanceId, buffer.memoryAddress);
-          BufferLedger ledger = am.associate(allocator);
-          ArrowBuf buf = new ArrowBuf(ledger, null, size, buffer.memoryAddress);
-          buffers.add(buf);
-        }
-
-        try {
-          final int numRows = LargeMemoryUtil.checkedCastToInt(handle.getNumRows());
-          peek = new ArrowRecordBatch(numRows, handle.getFields().stream()
-              .map(field -> new ArrowFieldNode(field.length, field.nullCount))
-              .collect(Collectors.toList()), buffers);
-          return true;
-        } finally {
-          buffers.forEach(buffer -> buffer.getReferenceManager().release());
-        }
-      }
-
-      @Override
-      public ArrowRecordBatch next() {
-        if (!hasNext()) {
-          throw new NoSuchElementException();
-        }
-
-        try {
-          return peek;
-        } finally {
-          peek = null;
-        }
-      }
-    };
+    return new NativeReader(context.getAllocator());
   }
 
   @Override
@@ -167,4 +103,59 @@ public class NativeScanner implements Scanner {
       writeLock.unlock();
     }
   }
+
+  /**
+   * {@link ArrowReader} implementation for NativeDataset.
+   */
+  public class NativeReader extends ArrowReader {
+
+    private NativeReader(BufferAllocator allocator) {
+      super(allocator);
+    }
+
+    @Override
+    protected void loadRecordBatch(ArrowRecordBatch batch) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean loadNextBatch() throws IOException {
+      readLock.lock();
+      try {
+        if (closed) {
+          throw new NativeInstanceReleasedException();
+        }
+        try (ArrowArray arrowArray = ArrowArray.allocateNew(context.getAllocator())) {
+          if (!JniWrapper.get().nextRecordBatch(scannerId, arrowArray.memoryAddress())) {
+            return false;
+          }
+          final VectorSchemaRoot vsr = getVectorSchemaRoot();
+          Data.importIntoVectorSchemaRoot(context.getAllocator(), arrowArray, vsr, this);
+        }
+      } finally {
+        readLock.unlock();
+      }
+      return true;
+    }
+
+    @Override
+    public long bytesRead() {
+      return 0L;
+    }
+
+    @Override
+    protected void closeReadSource() throws IOException {
+      // no-op
+    }
+
+    @Override
+    protected Schema readSchema() throws IOException {
+      return schema();
+    }
+  }
 }
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
index d07036a61e..434f5c9a6f 100644
--- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java
@@ -17,9 +17,9 @@
 
 package org.apache.arrow.dataset.scanner;
 
-import java.util.Iterator;
+import java.io.Reader;
 
-import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.ipc.ArrowReader;
 
 /**
  * Read record batches from a range of a single data fragment. A
@@ -29,14 +29,7 @@ import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 public interface ScanTask extends AutoCloseable {
 
   /**
-   * Creates and returns a {@link BatchIterator} instance.
+   * Execute this ScanTask and return a {@link Reader} instance.
    */
-  BatchIterator execute();
-
-  /**
-   * The iterator implementation for {@link org.apache.arrow.vector.ipc.message.ArrowRecordBatch}s.
-   */
-  interface BatchIterator extends Iterator<ArrowRecordBatch>, AutoCloseable {
-
-  }
+  ArrowReader execute();
 }
diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
deleted file mode 100644
index 963fb61704..0000000000
--- a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlyingMemory.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import org.apache.arrow.dataset.jni.JniWrapper;
-
-/**
- * AllocationManager implementation for native allocated memory.
- */
-public class NativeUnderlyingMemory extends AllocationManager {
-
-  private final int size;
-  private final long nativeInstanceId;
-  private final long address;
-
-  /**
-   * Constructor.
-   *
-   * @param accountingAllocator The accounting allocator instance
-   * @param size Size of underlying memory (in bytes)
-   * @param nativeInstanceId ID of the native instance
-   */
-  NativeUnderlyingMemory(BufferAllocator accountingAllocator, int size, long nativeInstanceId, long address) {
-    super(accountingAllocator);
-    this.size = size;
-    this.nativeInstanceId = nativeInstanceId;
-    this.address = address;
-    // pre-allocate bytes on accounting allocator
-    final AllocationListener listener = accountingAllocator.getListener();
-    try (final AllocationReservation reservation = accountingAllocator.newReservation()) {
-      listener.onPreAllocation(size);
-      reservation.reserve(size);
-      listener.onAllocation(size);
-    } catch (Exception e) {
-      release0();
-      throw e;
-    }
-  }
-
-  /**
-   * Alias to constructor.
-   */
-  public static NativeUnderlyingMemory create(BufferAllocator bufferAllocator, int size, long nativeInstanceId,
-      long address) {
-    return new NativeUnderlyingMemory(bufferAllocator, size, nativeInstanceId, address);
-  }
-
-  public BufferLedger associate(BufferAllocator allocator) {
-    return super.associate(allocator);
-  }
-
-  @Override
-  protected void release0() {
-    JniWrapper.get().releaseBuffer(nativeInstanceId);
-  }
-
-  @Override
-  public long getSize() {
-    return size;
-  }
-
-  @Override
-  protected long memoryAddress() {
-    return address;
-  }
-}
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
index 51dac15e56..15224534d2 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
@@ -17,6 +17,8 @@
 
 package org.apache.arrow.dataset;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Spliterator;
@@ -26,11 +28,15 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.ScanTask;
 import org.apache.arrow.dataset.scanner.Scanner;
 import org.apache.arrow.dataset.source.Dataset;
 import org.apache.arrow.dataset.source.DatasetFactory;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.VectorUnloader;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.After;
@@ -56,15 +62,31 @@ public abstract class TestDataset {
   protected List<ArrowRecordBatch> collectResultFromFactory(DatasetFactory factory, ScanOptions options) {
     final Dataset dataset = factory.finish();
     final Scanner scanner = dataset.newScan(options);
-    final List<ArrowRecordBatch> ret = stream(scanner.scan())
-        .flatMap(t -> stream(t.execute()))
-        .collect(Collectors.toList());
     try {
+      final List<ArrowRecordBatch> ret = stream(scanner.scan())
+          .flatMap(t -> stream(collectTaskData(t)))
+          .collect(Collectors.toList());
       AutoCloseables.close(scanner, dataset);
+      return ret;
+    } catch (RuntimeException e) {
+      throw e;
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    return ret;
+  }
+
+  protected List<ArrowRecordBatch> collectTaskData(ScanTask scanTask) {
+    try (ArrowReader reader = scanTask.execute()) {
+      List<ArrowRecordBatch> batches = new ArrayList<>();
+      while (reader.loadNextBatch()) {
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        final VectorUnloader unloader = new VectorUnloader(root);
+        batches.add(unloader.getRecordBatch());
+      }
+      return batches;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   protected Schema inferResultSchemaFromFactory(DatasetFactory factory, ScanOptions options) {
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
index 83d57c7421..92610b1145 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/file/TestFileSystemDataset.java
@@ -44,11 +44,11 @@ import org.apache.arrow.dataset.jni.NativeScanTask;
 import org.apache.arrow.dataset.jni.NativeScanner;
 import org.apache.arrow.dataset.jni.TestNativeDataset;
 import org.apache.arrow.dataset.scanner.ScanOptions;
-import org.apache.arrow.dataset.scanner.ScanTask;
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.VectorLoader;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowReader;
 import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
 import org.apache.arrow.vector.types.Types;
 import org.apache.arrow.vector.types.pojo.Schema;
@@ -90,6 +90,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
     checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
 
     AutoCloseables.close(datum);
+    AutoCloseables.close(factory);
   }
 
   @Test
@@ -116,6 +117,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
                 .build()), datum);
 
     AutoCloseables.close(datum);
+    AutoCloseables.close(factory);
   }
 
   @Test
@@ -135,6 +137,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
     checkParquetReadResult(schema, writeSupport.getWrittenRecords(), datum);
 
     AutoCloseables.close(datum);
+    AutoCloseables.close(factory);
   }
 
   @Test
@@ -217,6 +220,8 @@ public class TestFileSystemDataset extends TestNativeDataset {
       dataset.close();
       dataset.close();
     });
+
+    AutoCloseables.close(factory);
   }
 
   @Test
@@ -232,13 +237,14 @@ public class TestFileSystemDataset extends TestNativeDataset {
     List<? extends NativeScanTask> taskList2 = collect(scanner.scan());
     NativeScanTask task1 = taskList1.get(0);
     NativeScanTask task2 = taskList2.get(0);
-    List<ArrowRecordBatch> datum = collect(task1.execute());
+    List<ArrowRecordBatch> datum = collectTaskData(task1);
+
+    AutoCloseables.close(datum);
 
     UnsupportedOperationException uoe = assertThrows(UnsupportedOperationException.class, task2::execute);
     Assertions.assertEquals("NativeScanner cannot be executed more than once. Consider creating new scanner instead",
         uoe.getMessage());
 
-    AutoCloseables.close(datum);
     AutoCloseables.close(taskList1);
     AutoCloseables.close(taskList2);
     AutoCloseables.close(scanner, dataset, factory);
@@ -256,7 +262,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
     NativeScanner scanner = dataset.newScan(options);
     List<? extends NativeScanTask> taskList = collect(scanner.scan());
     NativeScanTask task = taskList.get(0);
-    List<ArrowRecordBatch> datum = executor.submit(() -> collect(task.execute())).get();
+    List<ArrowRecordBatch> datum = executor.submit(() -> collectTaskData(task)).get();
 
     AutoCloseables.close(datum);
     AutoCloseables.close(taskList);
@@ -274,6 +280,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
     NativeScanner scanner = dataset.newScan(options);
     scanner.close();
     assertThrows(NativeInstanceReleasedException.class, scanner::scan);
+    AutoCloseables.close(factory);
   }
 
   @Test
@@ -289,6 +296,7 @@ public class TestFileSystemDataset extends TestNativeDataset {
     NativeScanTask task = tasks.get(0);
     task.close();
     assertThrows(NativeInstanceReleasedException.class, task::execute);
+    AutoCloseables.close(factory);
   }
 
   @Test
@@ -302,28 +310,10 @@ public class TestFileSystemDataset extends TestNativeDataset {
     NativeScanner scanner = dataset.newScan(options);
     List<? extends NativeScanTask> tasks = collect(scanner.scan());
     NativeScanTask task = tasks.get(0);
-    ScanTask.BatchIterator iterator = task.execute();
+    ArrowReader reader = task.execute();
     task.close();
-    assertThrows(NativeInstanceReleasedException.class, iterator::hasNext);
-  }
-
-  @Test
-  public void testMemoryAllocationOnAssociatedAllocator() throws Exception {
-    ParquetWriteSupport writeSupport = ParquetWriteSupport.writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a");
-    FileSystemDatasetFactory factory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
-            FileFormat.PARQUET, writeSupport.getOutputURI());
-    ScanOptions options = new ScanOptions(100);
-    long initReservation = rootAllocator().getAllocatedMemory();
-    List<ArrowRecordBatch> datum = collectResultFromFactory(factory, options);
-    final long expected_diff = datum.stream()
-            .flatMapToLong(batch -> batch.getBuffers()
-                    .stream()
-                    .mapToLong(buf -> buf.getReferenceManager().getAccountedSize())).sum();
-    long reservation = rootAllocator().getAllocatedMemory();
-    AutoCloseables.close(datum);
-    long finalReservation = rootAllocator().getAllocatedMemory();
-    Assert.assertEquals(expected_diff, reservation - initReservation);
-    Assert.assertEquals(-expected_diff, finalReservation - reservation);
+    assertThrows(NativeInstanceReleasedException.class, reader::loadNextBatch);
+    AutoCloseables.close(factory);
   }
 
   private void checkParquetReadResult(Schema schema, String expectedJson, List<ArrowRecordBatch> actual)
diff --git a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java b/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
deleted file mode 100644
index c81868e42b..0000000000
--- a/java/dataset/src/test/java/org/apache/arrow/memory/TestNativeUnderlyingMemory.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.arrow.memory;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-public class TestNativeUnderlyingMemory {
-
-  private RootAllocator allocator = null;
-
-  @Before
-  public void setUp() {
-    allocator = new RootAllocator(Long.MAX_VALUE);
-  }
-
-  @After
-  public void tearDown() {
-    allocator.close();
-  }
-
-  protected RootAllocator rootAllocator() {
-    return allocator;
-  }
-
-  @Test
-  public void testReservation() {
-    final RootAllocator root = rootAllocator();
-
-    final int size = 512;
-    final AllocationManager am = new MockUnderlyingMemory(root, size);
-    final BufferLedger ledger = am.associate(root);
-
-    assertEquals(size, root.getAllocatedMemory());
-
-    ledger.release();
-  }
-
-  @Test
-  public void testBufferTransfer() {
-    final RootAllocator root = rootAllocator();
-
-    ChildAllocator allocator1 = (ChildAllocator) root.newChildAllocator("allocator1", 0, Long.MAX_VALUE);
-    ChildAllocator allocator2 = (ChildAllocator) root.newChildAllocator("allocator2", 0, Long.MAX_VALUE);
-    assertEquals(0, allocator1.getAllocatedMemory());
-    assertEquals(0, allocator2.getAllocatedMemory());
-
-    final int size = 512;
-    final AllocationManager am = new MockUnderlyingMemory(allocator1, size);
-
-    final BufferLedger owningLedger = am.associate(allocator1);
-    assertEquals(size, owningLedger.getAccountedSize());
-    assertEquals(size, owningLedger.getSize());
-    assertEquals(size, allocator1.getAllocatedMemory());
-
-    final BufferLedger transferredLedger = am.associate(allocator2);
-    owningLedger.release(); // release previous owner
-    assertEquals(0, owningLedger.getAccountedSize());
-    assertEquals(size, owningLedger.getSize());
-    assertEquals(size, transferredLedger.getAccountedSize());
-    assertEquals(size, transferredLedger.getSize());
-    assertEquals(0, allocator1.getAllocatedMemory());
-    assertEquals(size, allocator2.getAllocatedMemory());
-
-    transferredLedger.release();
-    allocator1.close();
-    allocator2.close();
-  }
-
-  /**
-   * A mock class of {@link NativeUnderlyingMemory} for unit testing about size-related operations.
-   */
-  private static class MockUnderlyingMemory extends NativeUnderlyingMemory {
-
-    /**
-     * Constructor.
-     */
-    MockUnderlyingMemory(BaseAllocator accountingAllocator, int size) {
-      super(accountingAllocator, size, -1L, -1L);
-    }
-
-    @Override
-    protected void release0() {
-      System.out.println("Underlying memory released. Size: " + getSize());
-    }
-
-    @Override
-    protected long memoryAddress() {
-      throw new UnsupportedOperationException();
-    }
-  }
-}
diff --git a/java/pom.xml b/java/pom.xml
index edcaac6b87..c74023c9e7 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -716,6 +716,7 @@
         <module>adapter/orc</module>
         <module>gandiva</module>
         <module>dataset</module>
+        <module>c</module>
       </modules>
     </profile>