You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/05/24 18:25:42 UTC
[arrow] branch main updated: GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ (#34227)
This is an automated email from the ASF dual-hosted git repository.
lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 95c33d82e6 GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ (#34227)
95c33d82e6 is described below
commit 95c33d82e67c9500b08ea7086a46b447d65bc14b
Author: david dali susanibar arce <da...@gmail.com>
AuthorDate: Wed May 24 13:25:30 2023 -0500
GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++ (#34227)
* Closes: #34223
The purpose of this PR is to implement:
1. JNI Wrappers to consume Acero capabilities that execute Substrait Plans.
2. Java base code to offer API that consume Substrait Plans.
3. Initial Substrait documentation
Lead-authored-by: david dali susanibar arce <da...@gmail.com>
Co-authored-by: David Li <li...@gmail.com>
Signed-off-by: David Li <li...@gmail.com>
---
ci/scripts/java_jni_macos_build.sh | 1 +
ci/scripts/java_jni_manylinux_build.sh | 1 +
ci/scripts/java_jni_windows_build.sh | 1 +
docs/source/java/index.rst | 1 +
docs/source/java/substrait.rst | 107 +++++++++++
java/dataset/CMakeLists.txt | 9 +-
java/dataset/src/main/cpp/jni_wrapper.cc | 118 ++++++++++++
.../dataset/substrait/AceroSubstraitConsumer.java | 142 ++++++++++++++
.../apache/arrow/dataset/substrait/JniWrapper.java | 73 ++++++++
.../java/org/apache/arrow/dataset/TestDataset.java | 6 +-
.../substrait/TestAceroSubstraitConsumer.java | 207 +++++++++++++++++++++
.../src/test/resources/avroschema/user.avsc | 2 +-
.../resources/substrait/local_files_users.json | 75 ++++++++
.../resources/substrait/named_table_users.json | 70 +++++++
java/pom.xml | 1 +
15 files changed, 809 insertions(+), 5 deletions(-)
diff --git a/ci/scripts/java_jni_macos_build.sh b/ci/scripts/java_jni_macos_build.sh
index 97d6c4cf6c..c38f072709 100755
--- a/ci/scripts/java_jni_macos_build.sh
+++ b/ci/scripts/java_jni_macos_build.sh
@@ -72,6 +72,7 @@ cmake \
-DARROW_BUILD_TESTS=${ARROW_BUILD_TESTS} \
-DARROW_CSV=${ARROW_DATASET} \
-DARROW_DATASET=${ARROW_DATASET} \
+ -DARROW_SUBSTRAIT=${ARROW_DATASET} \
-DARROW_DEPENDENCY_USE_SHARED=OFF \
-DARROW_GANDIVA=${ARROW_GANDIVA} \
-DARROW_GANDIVA_STATIC_LIBSTDCPP=ON \
diff --git a/ci/scripts/java_jni_manylinux_build.sh b/ci/scripts/java_jni_manylinux_build.sh
index 211dff8cf2..4e1192a4db 100755
--- a/ci/scripts/java_jni_manylinux_build.sh
+++ b/ci/scripts/java_jni_manylinux_build.sh
@@ -72,6 +72,7 @@ cmake \
-DARROW_BUILD_TESTS=ON \
-DARROW_CSV=${ARROW_DATASET} \
-DARROW_DATASET=${ARROW_DATASET} \
+ -DARROW_SUBSTRAIT=${ARROW_DATASET} \
-DARROW_DEPENDENCY_SOURCE="VCPKG" \
-DARROW_DEPENDENCY_USE_SHARED=OFF \
-DARROW_GANDIVA_PC_CXX_FLAGS=${GANDIVA_CXX_FLAGS} \
diff --git a/ci/scripts/java_jni_windows_build.sh b/ci/scripts/java_jni_windows_build.sh
index 954c04050f..778ee96967 100755
--- a/ci/scripts/java_jni_windows_build.sh
+++ b/ci/scripts/java_jni_windows_build.sh
@@ -61,6 +61,7 @@ cmake \
-DARROW_BUILD_TESTS=ON \
-DARROW_CSV=${ARROW_DATASET} \
-DARROW_DATASET=${ARROW_DATASET} \
+ -DARROW_SUBSTRAIT=${ARROW_DATASET} \
-DARROW_DEPENDENCY_USE_SHARED=OFF \
-DARROW_ORC=${ARROW_ORC} \
-DARROW_PARQUET=${ARROW_PARQUET} \
diff --git a/docs/source/java/index.rst b/docs/source/java/index.rst
index a1e924f9c0..9b555e297b 100644
--- a/docs/source/java/index.rst
+++ b/docs/source/java/index.rst
@@ -37,6 +37,7 @@ on the Arrow format and other language bindings see the :doc:`parent documentati
flight_sql
flight_sql_jdbc_driver
dataset
+ substrait
cdata
jdbc
Reference (javadoc) <reference/index>
diff --git a/docs/source/java/substrait.rst b/docs/source/java/substrait.rst
new file mode 100644
index 0000000000..41effedbf0
--- /dev/null
+++ b/docs/source/java/substrait.rst
@@ -0,0 +1,107 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements. See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership. The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License. You may obtain a copy of the License at
+
+.. http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied. See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero <../cpp/streaming_execution>`
+query engine.
+
+Executing Substrait Plans
+=========================
+
+Plans can reference data in files via URIs, or "named tables" that must be provided along with the plan.
+
+Here is an example of a Java program that queries a Parquet file using Java Substrait
+(this example use `Substrait Java`_ project to compile a SQL query to a Substrait plan):
+
+.. code-block:: Java
+
+ import com.google.common.collect.ImmutableList;
+ import io.substrait.isthmus.SqlToSubstrait;
+ import io.substrait.proto.Plan;
+ import org.apache.arrow.dataset.file.FileFormat;
+ import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+ import org.apache.arrow.dataset.jni.NativeMemoryPool;
+ import org.apache.arrow.dataset.scanner.ScanOptions;
+ import org.apache.arrow.dataset.scanner.Scanner;
+ import org.apache.arrow.dataset.source.Dataset;
+ import org.apache.arrow.dataset.source.DatasetFactory;
+ import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
+ import org.apache.arrow.memory.BufferAllocator;
+ import org.apache.arrow.memory.RootAllocator;
+ import org.apache.arrow.vector.ipc.ArrowReader;
+ import org.apache.calcite.sql.parser.SqlParseException;
+
+ import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
+
+ public class ClientSubstrait {
+ public static void main(String[] args) {
+ String uri = "file:///data/tpch_parquet/nation.parquet";
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ BufferAllocator allocator = new RootAllocator();
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, uri);
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ // map table to reader
+ Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+ mapTableToArrowReader.put("NATION", reader);
+ // get binary plan
+ Plan plan = getPlan();
+ ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
+ substraitPlan.put(plan.toByteArray());
+ // run query
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
+ substraitPlan,
+ mapTableToArrowReader
+ )) {
+ while (arrowReader.loadNextBatch()) {
+ System.out.println(arrowReader.getVectorSchemaRoot().contentToTSVString());
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ static Plan getPlan() throws SqlParseException {
+ String sql = "SELECT * from nation";
+ String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
+ "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+ SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+ Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
+ return plan;
+ }
+ }
+
+.. code-block:: text
+
+ // Results example:
+ FieldPath(0) FieldPath(1) FieldPath(2) FieldPath(3)
+ 0 ALGERIA 0 haggle. carefully final deposits detect slyly agai
+ 1 ARGENTINA 1 al foxes promise slyly according to the regular accounts. bold requests alon
+
+.. _`Substrait`: https://substrait.io/
+.. _`Substrait Java`: https://github.com/substrait-io/substrait-java
+.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
\ No newline at end of file
diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt
index 315163a537..ede3ee7330 100644
--- a/java/dataset/CMakeLists.txt
+++ b/java/dataset/CMakeLists.txt
@@ -16,6 +16,7 @@
# under the License.
find_package(ArrowDataset REQUIRED)
+find_package(ArrowSubstrait REQUIRED)
include_directories(${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}
${JNI_INCLUDE_DIRS} ${JNI_HEADERS_DIR})
@@ -26,14 +27,18 @@ add_jar(arrow_java_jni_dataset_jar
src/main/java/org/apache/arrow/dataset/file/JniWrapper.java
src/main/java/org/apache/arrow/dataset/jni/NativeMemoryPool.java
src/main/java/org/apache/arrow/dataset/jni/ReservationListener.java
+ src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
GENERATE_NATIVE_HEADERS
arrow_java_jni_dataset_headers)
add_library(arrow_java_jni_dataset SHARED src/main/cpp/jni_wrapper.cc
src/main/cpp/jni_util.cc)
set_property(TARGET arrow_java_jni_dataset PROPERTY OUTPUT_NAME "arrow_dataset_jni")
-target_link_libraries(arrow_java_jni_dataset arrow_java_jni_dataset_headers jni
- ArrowDataset::arrow_dataset_static)
+target_link_libraries(arrow_java_jni_dataset
+ arrow_java_jni_dataset_headers
+ jni
+ ArrowDataset::arrow_dataset_static
+ ArrowSubstrait::arrow_substrait_static)
if(BUILD_TESTING)
add_executable(arrow-java-jni-dataset-test src/main/cpp/jni_util_test.cc
diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc
index b3b5fe18c7..48191eac49 100644
--- a/java/dataset/src/main/cpp/jni_wrapper.cc
+++ b/java/dataset/src/main/cpp/jni_wrapper.cc
@@ -16,6 +16,7 @@
// under the License.
#include <mutex>
+#include <unordered_map>
#include "arrow/array.h"
#include "arrow/array/concatenate.h"
@@ -24,12 +25,14 @@
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
+#include "arrow/engine/substrait/util.h"
#include "arrow/ipc/api.h"
#include "arrow/util/iterator.h"
#include "jni_util.h"
#include "org_apache_arrow_dataset_file_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_JniWrapper.h"
#include "org_apache_arrow_dataset_jni_NativeMemoryPool.h"
+#include "org_apache_arrow_dataset_substrait_JniWrapper.h"
namespace {
@@ -261,6 +264,52 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
default_memory_pool_id = -1L;
}
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data. This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> LoadNamedTables(JNIEnv* env, const jobjectArray& str_array) {
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader;
+ int length = env->GetArrayLength(str_array);
+ if (length % 2 != 0) {
+ JniThrow("Can not map odd number of array elements to key/value pairs");
+ }
+ std::shared_ptr<arrow::Table> output_table;
+ for (int pos = 0; pos < length; pos++) {
+ auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+ pos++;
+ auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+ uintptr_t memory_address = 0;
+ try {
+ memory_address = std::stol(JStringToCString(env, j_string_value));
+ } catch(const std::exception& ex) {
+ JniThrow("Failed to parse memory address from string value. Error: " + std::string(ex.what()));
+ } catch (...) {
+ JniThrow("Failed to parse memory address from string value.");
+ }
+ auto* arrow_stream_in = reinterpret_cast<ArrowArrayStream*>(memory_address);
+ std::shared_ptr<arrow::RecordBatchReader> readerIn = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+ output_table = JniGetOrThrow(readerIn->ToTable());
+ map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] = output_table;
+ }
+ return map_table_to_record_batch_reader;
+}
+
+/// Find the arrow Table associated with a given table name
+std::shared_ptr<arrow::Table> GetTableByName(const std::vector<std::string>& names,
+ const std::unordered_map<std::string, std::shared_ptr<arrow::Table>>& tables) {
+ if (names.size() != 1) {
+ JniThrow("Tables with hierarchical names are not supported");
+ }
+ const auto& it = tables.find(names[0]);
+ if (it == tables.end()) {
+ JniThrow("Table is referenced, but not provided: " + names[0]);
+ }
+ return it->second;
+}
+
/*
* Class: org_apache_arrow_dataset_jni_NativeMemoryPool
* Method: getDefaultMemoryPool
@@ -578,3 +627,72 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
JNI_METHOD_END()
}
+
+/*
+ * Class: org_apache_arrow_dataset_substrait_JniWrapper
+ * Method: executeSerializedPlan
+ * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL
+ Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlan__Ljava_lang_String_2_3Ljava_lang_String_2J (
+ JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input,
+ jlong memory_address_output) {
+ JNI_METHOD_START
+ // get mapping of table name to memory address
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader =
+ LoadNamedTables(env, table_to_memory_address_input);
+ // create table provider
+ arrow::engine::NamedTableProvider table_provider =
+ [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) {
+ std::shared_ptr<arrow::Table> output_table = GetTableByName(names, map_table_to_reader);
+ std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+ std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+ return arrow::acero::Declaration("table_source", {}, options, "java_source");
+ };
+ arrow::engine::ConversionOptions conversion_options;
+ conversion_options.named_table_provider = std::move(table_provider);
+ // execute plan
+ std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(
+ JStringToCString(env, plan)));
+ std::shared_ptr<arrow::RecordBatchReader> reader_out =
+ JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options));
+ auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+ JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out));
+ JNI_METHOD_END()
+}
+
+/*
+ * Class: org_apache_arrow_dataset_substrait_JniWrapper
+ * Method: executeSerializedPlan
+ * Signature: (Ljava/nio/ByteBuffer;[Ljava/lang/String;J)V
+ */
+JNIEXPORT void JNICALL
+ Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlan__Ljava_nio_ByteBuffer_2_3Ljava_lang_String_2J (
+ JNIEnv* env, jobject, jobject plan, jobjectArray table_to_memory_address_input,
+ jlong memory_address_output) {
+ JNI_METHOD_START
+ // get mapping of table name to memory address
+ std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader =
+ LoadNamedTables(env, table_to_memory_address_input);
+ // create table provider
+ arrow::engine::NamedTableProvider table_provider =
+ [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) {
+ std::shared_ptr<arrow::Table> output_table = GetTableByName(names, map_table_to_reader);
+ std::shared_ptr<arrow::acero::ExecNodeOptions> options =
+ std::make_shared<arrow::acero::TableSourceNodeOptions>(std::move(output_table));
+ return arrow::acero::Declaration("table_source", {}, options, "java_source");
+ };
+ arrow::engine::ConversionOptions conversion_options;
+ conversion_options.named_table_provider = std::move(table_provider);
+ // mapping arrow::Buffer
+ auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan));
+ int length = env->GetDirectBufferCapacity(plan);
+ std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length));
+ std::memcpy(buffer->mutable_data(), buff, length);
+ // execute plan
+ std::shared_ptr<arrow::RecordBatchReader> reader_out =
+ JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, nullptr, nullptr, conversion_options));
+ auto* arrow_stream_out = reinterpret_cast<ArrowArrayStream*>(memory_address_output);
+ JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader_out, arrow_stream_out));
+ JNI_METHOD_END()
+}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java
new file mode 100644
index 0000000000..d5a29ad4e9
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+ private final BufferAllocator allocator;
+
+ public AceroSubstraitConsumer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ /**
+ * Run Substrait plan.
+ *
+ * @param plan The JSON Substrait plan.
+ * @return the ArrowReader to iterate for record batches.
+ */
+ public ArrowReader runQuery(String plan) throws Exception {
+ return runQuery(plan, Collections.emptyMap());
+ }
+
+ /**
+ * Run Substrait plan.
+ *
+ * @param plan The JSON Substrait plan.
+ * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data
+ * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value.
+ * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+ * Map<String, ArrowReader> namedTables = new HashMap<>();
+ * namedTables.put("NATION", nationReader);}</pre>
+ * @return the ArrowReader to iterate for record batches.
+ */
+ public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) throws Exception {
+ return execute(plan, namedTables);
+ }
+
+ /**
+ * Run Substrait plan.
+ *
+ * @param plan the binary Substrait plan.
+ * @return the ArrowReader to iterate for record batches.
+ */
+ public ArrowReader runQuery(ByteBuffer plan) throws Exception {
+ return runQuery(plan, Collections.emptyMap());
+ }
+
+ /**
+ * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+ *
+ * @param plan the binary Substrait plan.
+ * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data
+ * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value.
+ * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+ * Map<String, ArrowReader> namedTables = new HashMap<>();
+ * namedTables.put("NATION", nationReader);}</pre>
+ * @return the ArrowReader to iterate for record batches.
+ */
+ public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> namedTables) throws Exception {
+ return execute(plan, namedTables);
+ }
+
+ private ArrowReader execute(String plan, Map<String, ArrowReader> namedTables) throws Exception {
+ List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+ try (
+ ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator)
+ ) {
+ String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+ JniWrapper.get().executeSerializedPlan(
+ plan,
+ mapTableToMemoryAddress,
+ streamOutput.memoryAddress()
+ );
+ return Data.importArrayStream(this.allocator, streamOutput);
+ } finally {
+ AutoCloseables.close(arrowArrayStream);
+ }
+ }
+
+ private ArrowReader execute(ByteBuffer plan, Map<String, ArrowReader> namedTables) throws Exception {
+ List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+ try (
+ ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator)
+ ) {
+ String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+ JniWrapper.get().executeSerializedPlan(
+ plan,
+ mapTableToMemoryAddress,
+ streamOutput.memoryAddress()
+ );
+ return Data.importArrayStream(this.allocator, streamOutput);
+ } finally {
+ AutoCloseables.close(arrowArrayStream);
+ }
+ }
+
+ private String[] getMapTableToMemoryAddress(Map<String, ArrowReader> mapTableToArrowReader,
+ List<ArrowArrayStream> listStreamInput) {
+ String[] mapTableToMemoryAddress = new String[mapTableToArrowReader.size() * 2];
+ ArrowArrayStream streamInput;
+ int pos = 0;
+ for (Map.Entry<String, ArrowReader> entries : mapTableToArrowReader.entrySet()) {
+ streamInput = ArrowArrayStream.allocateNew(this.allocator);
+ listStreamInput.add(streamInput);
+ Data.exportArrayStream(this.allocator, entries.getValue(), streamInput);
+ mapTableToMemoryAddress[pos] = entries.getKey();
+ mapTableToMemoryAddress[pos + 1] = String.valueOf(streamInput.memoryAddress());
+ pos += 2;
+ }
+ return mapTableToMemoryAddress;
+ }
+}
diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
new file mode 100644
index 0000000000..236d1d5616
--- /dev/null
+++ b/java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * Class that contains Native methods to call Acero C++ Substrait API. It internally depends on C++ function
+ * arrow::engine::ExecuteSerializedPlan. Currently supported input parameters supported are:
+ * <pre>
+ * - arrow::Buffer: Susbtrait Plan (JSON or Binary format).
+ * - arrow::engine::ConversionOptions: Mapping for arrow::engine::NamedTableProvider.
+ * </pre>
+ */
+final class JniWrapper {
+ private static final JniWrapper INSTANCE = new JniWrapper();
+
+ private JniWrapper() {
+ }
+
+ public static JniWrapper get() {
+ JniLoader.get().ensureLoaded();
+ return INSTANCE;
+ }
+
+ /**
+ * Consume the JSON Substrait Plan that contains Named Tables and export the RecordBatchReader into
+ * C-Data Interface ArrowArrayStream.
+ *
+ * @param planInput the JSON Substrait plan.
+ * @param mapTableToMemoryAddressInput the mapping name of Tables Name on position `i` and theirs Memory Address
+ * representation on `i+1` position linearly.
+ * <pre>{@code String[] mapTableToMemoryAddress = new String[2];
+ * mapTableToMemoryAddress[0]="NATION";
+ * mapTableToMemoryAddress[1]="140650250895360";}</pre>
+ * @param memoryAddressOutput the memory address where RecordBatchReader is exported.
+ *
+ */
+ public native void executeSerializedPlan(String planInput, String[] mapTableToMemoryAddressInput,
+ long memoryAddressOutput);
+
+ /**
+ * Consume the binary Substrait Plan that contains Named Tables and export the RecordBatchReader into
+ * C-Data Interface ArrowArrayStream.
+ *
+ * @param planInput the binary Substrait plan.
+ * @param mapTableToMemoryAddressInput the mapping name of Tables Name on position `i` and theirs Memory Address
+ * representation on `i+1` position linearly.
+ * <pre>{@code String[] mapTableToMemoryAddress = new String[2];
+ * mapTableToMemoryAddress[0]="NATION";
+ * mapTableToMemoryAddress[1]="140650250895360";}</pre>
+ * @param memoryAddressOutput the memory address where RecordBatchReader is exported.
+ */
+ public native void executeSerializedPlan(ByteBuffer planInput, String[] mapTableToMemoryAddressInput,
+ long memoryAddressOutput);
+}
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
index 2516c40959..af2abeee21 100644
--- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestDataset.java
@@ -31,6 +31,7 @@ import org.apache.arrow.dataset.scanner.ScanOptions;
import org.apache.arrow.dataset.scanner.Scanner;
import org.apache.arrow.dataset.source.Dataset;
import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -41,8 +42,9 @@ import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.After;
import org.junit.Before;
+
public abstract class TestDataset {
- private RootAllocator allocator = null;
+ private BufferAllocator allocator = null;
@Before
public void setUp() {
@@ -54,7 +56,7 @@ public abstract class TestDataset {
allocator.close();
}
- protected RootAllocator rootAllocator() {
+ protected BufferAllocator rootAllocator() {
return allocator;
}
diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
new file mode 100644
index 0000000000..c23b7e0028
--- /dev/null
+++ b/java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+ @ClassRule
+ public static final TemporaryFolder TMP = new TemporaryFolder();
+ public static final String AVRO_SCHEMA_USER = "user.avsc";
+
+ @Test
+ public void testRunQueryLocalFiles() throws Exception {
+ //Query:
+ //SELECT id, name FROM Users
+ //Isthmus:
+ //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+ //VARCHAR(150) -> is mapping to -> {ARROW:extension:name=varchar, ARROW:extension:metadata=varchar{length:150}}
+ Map<String, String> metadataName = new HashMap<>();
+ metadataName.put("ARROW:extension:name", "varchar");
+ metadataName.put("ARROW:extension:metadata", "varchar{length:150}");
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("ID", new ArrowType.Int(32, true)),
+ new Field("NAME", new FieldType(true, new ArrowType.Utf8(), null, metadataName), null)
+ ), Collections.emptyMap());
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+ .runQuery(
+ new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+ .getResource("substrait/local_files_users.json").toURI()))).replace("FILENAME_PLACEHOLDER",
+ writeSupport.getOutputURI())
+ )
+ ) {
+ assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+ int rowcount = 0;
+ while (arrowReader.loadNextBatch()) {
+ rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+
+ @Test
+ public void testRunQueryNamedTableNation() throws Exception {
+ //Query:
+ //SELECT id, name FROM Users
+ //Isthmus:
+ //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("ID", new ArrowType.Int(32, true)),
+ Field.nullable("NAME", new ArrowType.Utf8())
+ ), Collections.emptyMap());
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+ mapTableToArrowReader.put("USERS", reader);
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery(
+ new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+ .getResource("substrait/named_table_users.json").toURI()))),
+ mapTableToArrowReader
+ )) {
+ assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+ assertEquals(arrowReader.getVectorSchemaRoot().getSchema(), schema);
+ int rowcount = 0;
+ while (arrowReader.loadNextBatch()) {
+ rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testRunQueryNamedTableNationWithException() throws Exception {
+ //Query:
+ //SELECT id, name FROM Users
+ //Isthmus:
+ //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("ID", new ArrowType.Int(32, true)),
+ Field.nullable("NAME", new ArrowType.Utf8())
+ ), Collections.emptyMap());
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+ mapTableToArrowReader.put("USERS_INVALID_MAP", reader);
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery(
+ new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+ .getResource("substrait/named_table_users.json").toURI()))),
+ mapTableToArrowReader
+ )) {
+ assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+ int rowcount = 0;
+ while (arrowReader.loadNextBatch()) {
+ rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+ }
+
+ @Test
+ public void testRunBinaryQueryNamedTableNation() throws Exception {
+ //Query:
+ //SELECT id, name FROM Users
+ //Isthmus:
+ //./isthmus-macOS-0.7.0 -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+ final Schema schema = new Schema(Arrays.asList(
+ Field.nullable("ID", new ArrowType.Int(32, true)),
+ Field.nullable("NAME", new ArrowType.Utf8())
+ ), Collections.emptyMap());
+ // Base64.getEncoder().encodeToString(plan.toByteArray());
+ String binaryPlan =
+ "Gl8SXQpROk8KBhIECgICAxIvCi0KAgoAEh4KAklECgROQU1FEhIKBCoCEAEKC" +
+ "LIBBQiWARgBGAI6BwoFVVNFUlMaCBIGCgISACIAGgoSCAoEEgIIASIAEgJJRBIETkFNRQ==";
+ ParquetWriteSupport writeSupport = ParquetWriteSupport
+ .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+ ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+ try (
+ DatasetFactory datasetFactory = new FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+ FileFormat.PARQUET, writeSupport.getOutputURI());
+ Dataset dataset = datasetFactory.finish();
+ Scanner scanner = dataset.newScan(options);
+ ArrowReader reader = scanner.scanBatches()
+ ) {
+ // map table to reader
+ Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+ mapTableToArrowReader.put("USERS", reader);
+ // get binary plan
+ byte[] plan = Base64.getDecoder().decode(binaryPlan);
+ ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.length);
+ substraitPlan.put(plan);
+ // run query
+ try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator()).runQuery(
+ substraitPlan,
+ mapTableToArrowReader
+ )) {
+ assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+ int rowcount = 0;
+ while (arrowReader.loadNextBatch()) {
+ rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+ }
+ assertEquals(3, rowcount);
+ }
+ }
+ }
+}
diff --git a/java/dataset/src/test/resources/avroschema/user.avsc b/java/dataset/src/test/resources/avroschema/user.avsc
index 072b643912..5a4635b6dc 100644
--- a/java/dataset/src/test/resources/avroschema/user.avsc
+++ b/java/dataset/src/test/resources/avroschema/user.avsc
@@ -18,7 +18,7 @@
{
"namespace": "org.apache.arrow.dataset",
"type": "record",
- "name": "User",
+ "name": "Users",
"fields": [
{"name": "id", "type": ["int", "null"]},
{"name": "name", "type": ["string", "null"]}
diff --git a/java/dataset/src/test/resources/substrait/local_files_users.json b/java/dataset/src/test/resources/substrait/local_files_users.json
new file mode 100644
index 0000000000..a2f5af1b3b
--- /dev/null
+++ b/java/dataset/src/test/resources/substrait/local_files_users.json
@@ -0,0 +1,75 @@
+{
+ "extensionUris": [],
+ "extensions": [],
+ "relations": [{
+ "root": {
+ "input": {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [2, 3]
+ }
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {
+ }
+ },
+ "baseSchema": {
+ "names": ["ID", "NAME"],
+ "struct": {
+ "types": [{
+ "i32": {
+ "typeVariationReference": 0,
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }, {
+ "varchar": {
+ "length": 150,
+ "typeVariationReference": 0,
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }],
+ "typeVariationReference": 0,
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": "FILENAME_PLACEHOLDER",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "expressions": [{
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }, {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }]
+ }
+ },
+ "names": ["ID", "NAME"]
+ }
+ }],
+ "expectedTypeUrls": []
+}
\ No newline at end of file
diff --git a/java/dataset/src/test/resources/substrait/named_table_users.json b/java/dataset/src/test/resources/substrait/named_table_users.json
new file mode 100644
index 0000000000..629eebd059
--- /dev/null
+++ b/java/dataset/src/test/resources/substrait/named_table_users.json
@@ -0,0 +1,70 @@
+{
+ "extensionUris": [],
+ "extensions": [],
+ "relations": [{
+ "root": {
+ "input": {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [2, 3]
+ }
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {
+ }
+ },
+ "baseSchema": {
+ "names": ["ID", "NAME"],
+ "struct": {
+ "types": [{
+ "i32": {
+ "typeVariationReference": 0,
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }, {
+ "varchar": {
+ "length": 150,
+ "typeVariationReference": 0,
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }],
+ "typeVariationReference": 0,
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ },
+ "namedTable": {
+ "names": ["USERS"]
+ }
+ }
+ },
+ "expressions": [{
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }, {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }]
+ }
+ },
+ "names": ["ID", "NAME"]
+ }
+ }],
+ "expectedTypeUrls": []
+}
\ No newline at end of file
diff --git a/java/pom.xml b/java/pom.xml
index 05a895d5d5..2a7a3b4920 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -151,6 +151,7 @@
<exclude>**/*.tbl</exclude>
<exclude>**/*.iml</exclude>
<exclude>**/flight.properties</exclude>
+ <exclude>**/*.idea/**</exclude>
</excludes>
</configuration>
</plugin>