You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/05/24 18:25:42 UTC

[arrow] branch main updated: GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ (#34227)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 95c33d82e6 GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ (#34227)
95c33d82e6 is described below

commit 95c33d82e67c9500b08ea7086a46b447d65bc14b
Author: david dali susanibar arce <da...@gmail.com>
AuthorDate: Wed May 24 13:25:30 2023 -0500

    GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ (#34227)
    
    * Closes: #34223
    
    The purpose of this PR is to implement:
    
    1. JNI Wrappers to consume Acero capabilities that execute Substrait Plans.
    2. Java base code to offer API that consume Substrait Plans.
    3. Initial Substrait documentation
    
    Lead-authored-by: david dali susanibar arce <da...@gmail.com>
    Co-authored-by: David Li <li...@gmail.com>
    Signed-off-by: David Li <li...@gmail.com>
---
 ci/scripts/java_jni_macos_build.sh                 |   1 +
 ci/scripts/java_jni_manylinux_build.sh             |   1 +
 ci/scripts/java_jni_windows_build.sh               |   1 +
 docs/source/java/index.rst                         |   1 +
 docs/source/java/substrait.rst                     | 107 +++++++++++
 java/dataset/CMakeLists.txt                        |   9 +-
 java/dataset/src/main/cpp/jni_wrapper.cc           | 118 ++++++++++++
 .../dataset/substrait/AceroSubstraitConsumer.java  | 142 ++++++++++++++
 .../apache/arrow/dataset/substrait/JniWrapper.java |  73 ++++++++
 .../java/org/apache/arrow/dataset/TestDataset.java |   6 +-
 .../substrait/TestAceroSubstraitConsumer.java      | 207 +++++++++++++++++++++
 .../src/test/resources/avroschema/user.avsc        |   2 +-
 .../resources/substrait/local_files_users.json     |  75 ++++++++
 .../resources/substrait/named_table_users.json     |  70 +++++++
 java/pom.xml                                       |   1 +
 15 files changed, 809 insertions(+), 5 deletions(-)

diff --git a/ci/scripts/java_jni_macos_build.sh b/ci/scripts/java_jni_macos_build.sh
index 97d6c4cf6c..c38f072709 100755
--- a/ci/scripts/java_jni_macos_build.sh
+++ b/ci/scripts/java_jni_macos_build.sh
@@ -72,6 +72,7 @@ cmake \
   -DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS} \
   -DARROW_CSV=${ARROW_DATASET} \
   -DARROW_DATASET=${ARROW_DATASET} \
+  -DARROW_SUBSTRAIT=${ARROW_DATASET} \
   -DARROW_DEPENDENCY_USE_SHARED=OFF \
   -DARROW_GANDIVA=${ARROW_GANDIVA} \
   -DARROW_GANDIVA_STATIC_LIBSTDCPP=ON \
diff --git a/ci/scripts/java_jni_manylinux_build.sh b/ci/scripts/java_jni_manylinux_build.sh
index 211dff8cf2..4e1192a4db 100755
--- a/ci/scripts/java_jni_manylinux_build.sh
+++ b/ci/scripts/java_jni_manylinux_build.sh
@@ -72,6 +72,7 @@ cmake \
   -DARROW_BUILD_TESTS=ON \
   -DARROW_CSV=${ARROW_DATASET} \
   -DARROW_DATASET=${ARROW_DATASET} \
+  -DARROW_SUBSTRAIT=${ARROW_DATASET} \
   -DARROW_DEPENDENCY_SOURCE="VCPKG" \
   -DARROW_DEPENDENCY_USE_SHARED=OFF \
   -DARROW_GANDIVA_PC_CXX_FLAGS=${GANDIVA_CXX_FLAGS} \
diff --git a/ci/scripts/java_jni_windows_build.sh b/ci/scripts/java_jni_windows_build.sh
index 954c04050f..778ee96967 100755
--- a/ci/scripts/java_jni_windows_build.sh
+++ b/ci/scripts/java_jni_windows_build.sh
@@ -61,6 +61,7 @@ cmake \
   -DARROW_BUILD_TESTS=ON \
   -DARROW_CSV=${ARROW_DATASET} \
   -DARROW_DATASET=${ARROW_DATASET} \
+  -DARROW_SUBSTRAIT=${ARROW_DATASET} \
   -DARROW_DEPENDENCY_USE_SHARED=OFF \
   -DARROW_ORC=${ARROW_ORC} \
   -DARROW_PARQUET=${ARROW_PARQUET} \
diff --git a/docs/source/java/index.rst b/docs/source/java/index.rst
index a1e924f9c0..9b555e297b 100644
--- a/docs/source/java/index.rst
+++ b/docs/source/java/index.rst
@@ -37,6 +37,7 @@ on the Arrow format and other language bindings see the :doc:`parent documentati
    flight_sql
    flight_sql_jdbc_driver
    dataset
+   substrait
    cdata
    jdbc
    Reference (javadoc) <reference/index>
diff --git a/docs/source/java/substrait.rst b/docs/source/java/substrait.rst
new file mode 100644
index 0000000000..41effedbf0
--- /dev/null
+++ b/docs/source/java/substrait.rst
@@ -0,0 +1,107 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero <../cpp/streaming_execution>`
+query engine.
+
+Executing Substrait Plans
+=========================
+
+Plans can reference data in files via URIs, or "named tables" that must be provided along with the plan.
+
+Here is an example of a Java program that queries a Parquet file using Java Substrait
+(this example use `Substrait Java`_ project to compile a SQL query to a Substrait plan):
+
+.. code-block:: Java
+
+    import com.google.common.collect.ImmutableList;
+    import io.substrait.isthmus.SqlToSubstrait;
+    import io.substrait.proto.Plan;
+    import org.apache.arrow.dataset.file.FileFormat;
+    import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+    import org.apache.arrow.dataset.jni.NativeMemoryPool;
+    import org.apache.arrow.dataset.scanner.ScanOptions;
+    import org.apache.arrow.dataset.scanner.Scanner;
+    import org.apache.arrow.dataset.source.Dataset;
+    import org.apache.arrow.dataset.source.DatasetFactory;
+    import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.ipc.ArrowReader;
+    import org.apache.calcite.sql.parser.SqlParseException;
+
+    import java.nio.ByteBuffer;
+    import java.util.HashMap;
+    import java.util.Map;
+
+    public class ClientSubstrait {
+        public static void main(String[] args) {
+            String uri = "file:///data/tpch_parquet/nation.parquet";
+            ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+            try (
+                BufferAllocator allocator = new RootAllocator();
+                DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
+                        FileFormat.PARQUET, uri);
+                Dataset dataset = datasetFactory.finish();
+                Scanner scanner = dataset.newScan(options);
+                ArrowReader reader = scanner.scanBatches()
+            ) {
+                // map table to reader
+                Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+                mapTableToArrowReader.put("NATION", reader);
+                // get binary plan
+                Plan plan = getPlan();
+                ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
+                substraitPlan.put(plan.toByteArray());
+                // run query
+                try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
+                        substraitPlan,
+                        mapTableToArrowReader
+                )) {
+                    while (arrowReader.loadNextBatch()) {
+                        System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
+                    }
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        static Plan getPlan() throws SqlParseException {
+            String sql = "SELECT * from nation";
+            String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
+                    "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+            SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+            Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
+            return plan;
+        }
+    }
+
+.. code-block:: text
+
+    // Results example:
+    FieldPath(0)	FieldPath(1)	FieldPath(2)	FieldPath(3)
+    0	ALGERIA	0	 haggle. carefully final deposits detect slyly agai
+    1	ARGENTINA	1	al foxes promise slyly according to the regular accounts. bold requests alon
+
+.. _`Substrait`: https://substrait.io/
+.. _`Substrait Java`: https://github.com/substrait-io/substrait-java
+.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
\ No newline at end of file
diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt
index 315163a537..ede3ee7330 100644
--- a/java/dataset/CMakeLists.txt
+++ b/java/dataset/CMakeLists.txt
@@ -16,6 +16,7 @@
 # under the License.
 
 find_package(ArrowDataset REQUIRED)
+find_package(ArrowSubstrait REQUIRED)
 
 include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}
                     ${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR})
@@ -26,14 +27,18 @@ add_jar(arrow_java_jni_dataset_jar
         src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
         src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
         src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
+        src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
         GENERATE_NATIVE_HEADERS
         arrow_java_jni_dataset_headers)
 
 add_library(arrow_java_jni_dataset SHARED src/main/cpp/jni_wrapper.cc
                                           src/main/cpp/jni_util.cc)
 set_property(TARGET arrow_java_jni_dataset PROPERTY OUTPUT_NAME "arrow_dataset_jni")
-target_link_libraries(arrow_java_jni_dataset arrow_java_jni_dataset_headers jni
-                      ArrowDataset::arrow_dataset_static)
+target_link_libraries(arrow_java_jni_dataset
+                      arrow_java_jni_dataset_headers
+                      jni
+                      ArrowDataset::arrow_dataset_static
+                      ArrowSubstrait::arrow_substrait_static)
 
 if(BUILD_TESTING)
   add_executable(arrow-java-jni-dataset-test src/main/cpp/jni_util_test.cc
diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc
index b3b5fe18c7..48191eac49 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <mutex>
+#include <unordered_map>
 
 #include "arrow/array.h"
 #include "arrow/array/concatenate.h"
@@ -24,12 +25,14 @@
 #include "arrow/dataset/api.h"
 #include "arrow/dataset/file_base.h"
 #include "arrow/filesystem/localfs.h"
+#include "arrow/engine/substrait/util.h"
 #include "arrow/ipc/api.h"
 #include "arrow/util/iterator.h"
 #include "jni_util.h"
 #include "org_apache_arrow_dataset_file_JniWrapper.h"
 #include "org_apache_arrow_dataset_jni_JniWrapper.h"
 #include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
+#include "org_apache_arrow_dataset_substrait_JniWrapper.h"
 
 namespace {
 
@@ -261,6 +264,52 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   default_memory_pool_id = -1L;
 }
 
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data.  This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> LoadNamedTables(JNIEnv* env, const jobjectArray& str_array) {
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader;
+  int length = env->GetArrayLength(str_array);
+  if (length % 2 != 0) {
+    JniThrow("Can not map odd number of array elements to key/value pairs");
+  }
+  std::shared_ptr<arrow::Table> output_table;
+  for (int pos = 0; pos < length; pos++) {
+    auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    pos++;
+    auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    uintptr_t memory_address = 0;
+    try {
+      memory_address = std::stol(JStringToCString(env, j_string_value));
+    } catch(const std::exception& ex) {
+      JniThrow("Failed to parse memory address from string value. Error: " + std::string(ex.what()));
+    } catch (...) {
+      JniThrow("Failed to parse memory address from string value.");
+    }
+    auto* arrow_stream_in = reinterpret_cast<ArrowArrayStream*>(memory_address);
+    std::shared_ptr<arrow::RecordBatchReader> readerIn = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+    output_table = JniGetOrThrow(readerIn->ToTable());
+    map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] = output_table;
+  }
+  return map_table_to_record_batch_reader;
+}
+
+/// Find the arrow Table associated with a given table name
+std::shared_ptr<arrow::Table> GetTableByName(const std::vector<std::string>& names,
+    const std::unordered_map<std::string, std::shared_ptr<arrow::Table>>& tables) {
+  if (names.size() != 1) {
+    JniThrow("Tables with hierarchical names are not supported");
+  }
+  const auto& it = tables.find(names[0]);
+  if (it == tables.end()) {
+    JniThrow("Table is referenced, but not provided: " + names[0]);
+  }
+  return it->second;
+}
+
 /*
  * Class:     org_apache_arrow_dataset_jni_NativeMemoryPool
  * Method:    getDefaultMemoryPool
@@ -578,3 +627,72 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
   JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
   JNI_METHOD_END()
 }
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlan
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL
+    Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlan__Ljava_lang_String_2_3Ljava_lang_String_2J (
+    JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input,
+    jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader =
+    LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider =
+    [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, "java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // execute plan
+  std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(
+    JStringToCString(env, plan)));
+  std::shared_ptr<arrow::RecordBatchReader> reader_out =
+    JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options));
+  auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out));
+  JNI_METHOD_END()
+}
+
+/*
+ * Class:     org_apache_arrow_dataset_substrait_JniWrapper
+ * Method:    executeSerializedPlan
+ * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL
+    Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlan__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J (
+    JNIEnv* env, jobject, jobject plan, jobjectArray table_to_memory_address_input,
+    jlong memory_address_output) {
+  JNI_METHOD_START
+  // get mapping of table name to memory address
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader =
+    LoadNamedTables(env, table_to_memory_address_input);
+  // create table provider
+  arrow::engine::NamedTableProvider table_provider =
+    [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) {
+    std::shared_ptr<arrow::Table> output_table = GetTableByName(names, map_table_to_reader);
+    std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+      std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+    return arrow::acero::Declaration("table_source", {}, options, "java_source");
+  };
+  arrow::engine::ConversionOptions conversion_options;
+  conversion_options.named_table_provider = std::move(table_provider);
+  // mapping arrow::Buffer
+  auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+  int length = env->GetDirectBufferCapacity(plan);
+  std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length));
+  std::memcpy(buffer->mutable_data(), buff, length);
+  // execute plan
+  std::shared_ptr<arrow::RecordBatchReader> reader_out =
+    JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options));
+  auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+  JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out));
+  JNI_METHOD_END()
+}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java
new file mode 100644
index 0000000000..d5a29ad4e9
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan The JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data
+   *                              for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  private ArrowReader execute(String plan, Map<String, ArrowReader> namedTables) throws Exception {
+    List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+      JniWrapper.get().executeSerializedPlan(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      AutoCloseables.close(arrowArrayStream);
+    }
+  }
+
+  private ArrowReader execute(ByteBuffer plan, Map<String, ArrowReader> namedTables) throws Exception {
+    List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+      JniWrapper.get().executeSerializedPlan(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      AutoCloseables.close(arrowArrayStream);
+    }
+  }
+
+  private String[] getMapTableToMemoryAddress(Map<String, ArrowReader> mapTableToArrowReader,
+                                              List<ArrowArrayStream> listStreamInput) {
+    String[] mapTableToMemoryAddress = new String[mapTableToArrowReader.size() * 2];
+    ArrowArrayStream streamInput;
+    int pos = 0;
+    for (Map.Entry<String, ArrowReader> entries : mapTableToArrowReader.entrySet()) {
+      streamInput = ArrowArrayStream.allocateNew(this.allocator);
+      listStreamInput.add(streamInput);
+      Data.exportArrayStream(this.allocator, entries.getValue(), streamInput);
+      mapTableToMemoryAddress[pos] = entries.getKey();
+      mapTableToMemoryAddress[pos + 1] = String.valueOf(streamInput.memoryAddress());
+      pos += 2;
+    }
+    return mapTableToMemoryAddress;
+  }
+}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
new file mode 100644
index 0000000000..236d1d5616
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * Class that contains Native methods to call Acero C++ Substrait API. It internally depends on C++ function
+ * arrow::engine::ExecuteSerializedPlan. Currently supported input parameters supported are:
+ * <pre>
+ * - arrow::Buffer: Susbtrait Plan (JSON or Binary format).
+ * - arrow::engine::ConversionOptions: Mapping for arrow::engine::NamedTableProvider.
+ * </pre>
+ */
+final class JniWrapper {
+  private static final JniWrapper INSTANCE = new JniWrapper();
+
+  private JniWrapper() {
+  }
+
+  public static JniWrapper get() {
+    JniLoader.get().ensureLoaded();
+    return INSTANCE;
+  }
+
+  /**
+   * Consume the JSON Substrait Plan that contains Named Tables and export the RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param mapTableToMemoryAddressInput the mapping name of Tables Name on position `i` and theirs Memory Address
+   *                                     representation on `i+1` position linearly.
+   * <pre>{@code String[] mapTableToMemoryAddress = new String[2];
+   * mapTableToMemoryAddress[0]="NATION";
+   * mapTableToMemoryAddress[1]="140650250895360";}</pre>
+   * @param memoryAddressOutput the memory address where RecordBatchReader is exported.
+   *
+   */
+  public native void executeSerializedPlan(String planInput, String[] mapTableToMemoryAddressInput,
+                                                      long memoryAddressOutput);
+
+  /**
+   * Consume the binary Substrait Plan that contains Named Tables and export the RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the binary Substrait plan.
+   * @param mapTableToMemoryAddressInput the mapping name of Tables Name on position `i` and theirs Memory Address
+   *                                     representation on `i+1` position linearly.
+   * <pre>{@code String[] mapTableToMemoryAddress = new String[2];
+   * mapTableToMemoryAddress[0]="NATION";
+   * mapTableToMemoryAddress[1]="140650250895360";}</pre>
+   * @param memoryAddressOutput the memory address where RecordBatchReader is exported.
+   */
+  public native void executeSerializedPlan(ByteBuffer planInput, String[] mapTableToMemoryAddressInput,
+                                                      long memoryAddressOutput);
+}
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
index 2516c40959..af2abeee21 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
@@ -31,6 +31,7 @@ import org.apache.arrow.dataset.scanner.ScanOptions;
 import org.apache.arrow.dataset.scanner.Scanner;
 import org.apache.arrow.dataset.source.Dataset;
 import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.util.AutoCloseables;
 import org.apache.arrow.vector.VectorSchemaRoot;
@@ -41,8 +42,9 @@ import org.apache.arrow.vector.types.pojo.Schema;
 import org.junit.After;
 import org.junit.Before;
 
+
 public abstract class TestDataset {
-  private RootAllocator allocator = null;
+  private BufferAllocator allocator = null;
 
   @Before
   public void setUp() {
@@ -54,7 +56,7 @@ public abstract class TestDataset {
     allocator.close();
   }
 
-  protected RootAllocator rootAllocator() {
+  protected BufferAllocator rootAllocator() {
     return allocator;
   }
 
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
new file mode 100644
index 0000000000..c23b7e0028
--- /dev/null
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+    //VARCHAR(150) -> is mapping to -> {ARROW:extension:name=varchar, ARROW:extension:metadata=varchar{length:150}}
+    Map<String, String> metadataName = new HashMap<>();
+    metadataName.put("ARROW:extension:name", "varchar");
+    metadataName.put("ARROW:extension:metadata", "varchar{length:150}");
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        new Field("NAME", new FieldType(true, new ArrowType.Utf8(), null, metadataName), null)
+    ), Collections.emptyMap());
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+    try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+        .runQuery(
+            new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+                .getResource("substrait/local_files_users.json").toURI()))).replace("FILENAME_PLACEHOLDER",
+                writeSupport.getOutputURI())
+        )
+    ) {
+      assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+      int rowcount = 0;
+      while (arrowReader.loadNextBatch()) {
+        rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNation() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ), Collections.emptyMap());
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS", reader);
+      try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery(
+          new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+              .getResource("substrait/named_table_users.json").toURI()))),
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+        assertEquals(arrowReader.getVectorSchemaRoot().getSchema(), schema);
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testRunQueryNamedTableNationWithException() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ), Collections.emptyMap());
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS_INVALID_MAP", reader);
+      try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery(
+          new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+              .getResource("substrait/named_table_users.json").toURI()))),
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+
+  @Test
+  public void testRunBinaryQueryNamedTableNation() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ), Collections.emptyMap());
+    // Base64.getEncoder().encodeToString(plan.toByteArray());
+    String binaryPlan =
+        "Gl8SXQpROk8KBhIECgICAxIvCi0KAgoAEh4KAklECgROQU1FEhIKBCoCEAEKC" +
+            "LIBBQiWARgBGAI6BwoFVVNFUlMaCBIGCgISACIAGgoSCAoEEgIIASIAEgJJRBIETkFNRQ==";
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS", reader);
+      // get binary plan
+      byte[] plan = Base64.getDecoder().decode(binaryPlan);
+      ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.length);
+      substraitPlan.put(plan);
+      // run query
+      try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+}
diff --git a/java/dataset/src/test/resources/avroschema/user.avsc b/java/dataset/src/test/resources/avroschema/user.avsc
index 072b643912..5a4635b6dc 100644
--- a/java/dataset/src/test/resources/avroschema/user.avsc
+++ b/java/dataset/src/test/resources/avroschema/user.avsc
@@ -18,7 +18,7 @@
 {
  "namespace": "org.apache.arrow.dataset",
  "type": "record",
- "name": "User",
+ "name": "Users",
  "fields": [
      {"name": "id", "type": ["int", "null"]},
      {"name": "name", "type": ["string", "null"]}
diff --git a/java/dataset/src/test/resources/substrait/local_files_users.json b/java/dataset/src/test/resources/substrait/local_files_users.json
new file mode 100644
index 0000000000..a2f5af1b3b
--- /dev/null
+++ b/java/dataset/src/test/resources/substrait/local_files_users.json
@@ -0,0 +1,75 @@
+{
+  "extensionUris": [],
+  "extensions": [],
+  "relations": [{
+    "root": {
+      "input": {
+        "project": {
+          "common": {
+            "emit": {
+              "outputMapping": [2, 3]
+            }
+          },
+          "input": {
+            "read": {
+              "common": {
+                "direct": {
+                }
+              },
+              "baseSchema": {
+                "names": ["ID", "NAME"],
+                "struct": {
+                  "types": [{
+                    "i32": {
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  }, {
+                    "varchar": {
+                      "length": 150,
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  }],
+                  "typeVariationReference": 0,
+                  "nullability": "NULLABILITY_REQUIRED"
+                }
+              },
+              "local_files": {
+                "items": [
+                  {
+                    "uri_file": "FILENAME_PLACEHOLDER",
+                    "parquet": {}
+                  }
+                ]
+              }
+            }
+          },
+          "expressions": [{
+            "selection": {
+              "directReference": {
+                "structField": {
+                  "field": 0
+                }
+              },
+              "rootReference": {
+              }
+            }
+          }, {
+            "selection": {
+              "directReference": {
+                "structField": {
+                  "field": 1
+                }
+              },
+              "rootReference": {
+              }
+            }
+          }]
+        }
+      },
+      "names": ["ID", "NAME"]
+    }
+  }],
+  "expectedTypeUrls": []
+}
\ No newline at end of file
diff --git a/java/dataset/src/test/resources/substrait/named_table_users.json b/java/dataset/src/test/resources/substrait/named_table_users.json
new file mode 100644
index 0000000000..629eebd059
--- /dev/null
+++ b/java/dataset/src/test/resources/substrait/named_table_users.json
@@ -0,0 +1,70 @@
+{
+  "extensionUris": [],
+  "extensions": [],
+  "relations": [{
+    "root": {
+      "input": {
+        "project": {
+          "common": {
+            "emit": {
+              "outputMapping": [2, 3]
+            }
+          },
+          "input": {
+            "read": {
+              "common": {
+                "direct": {
+                }
+              },
+              "baseSchema": {
+                "names": ["ID", "NAME"],
+                "struct": {
+                  "types": [{
+                    "i32": {
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_REQUIRED"
+                    }
+                  }, {
+                    "varchar": {
+                      "length": 150,
+                      "typeVariationReference": 0,
+                      "nullability": "NULLABILITY_NULLABLE"
+                    }
+                  }],
+                  "typeVariationReference": 0,
+                  "nullability": "NULLABILITY_REQUIRED"
+                }
+              },
+              "namedTable": {
+                "names": ["USERS"]
+              }
+            }
+          },
+          "expressions": [{
+            "selection": {
+              "directReference": {
+                "structField": {
+                  "field": 0
+                }
+              },
+              "rootReference": {
+              }
+            }
+          }, {
+            "selection": {
+              "directReference": {
+                "structField": {
+                  "field": 1
+                }
+              },
+              "rootReference": {
+              }
+            }
+          }]
+        }
+      },
+      "names": ["ID", "NAME"]
+    }
+  }],
+  "expectedTypeUrls": []
+}
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index 05a895d5d5..2a7a3b4920 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -151,6 +151,7 @@
             <exclude>**/*.tbl</exclude>
             <exclude>**/*.iml</exclude>
             <exclude>**/flight.properties</exclude>
+            <exclude>**/*.idea/**</exclude>
           </excludes>
         </configuration>
       </plugin>