You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "lidavidm (via GitHub)" <gi...@apache.org> on 2023/04/11 11:37:00 UTC

[GitHub] [arrow] lidavidm commented on a diff in pull request #34227: GH-34223: [Java] Java Substrait Consumer JNI call to ACERO C++

lidavidm commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1162677509


##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,50 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   default_memory_pool_id = -1L;
 }
 
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements 
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data.  This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader;
+  int length = env->GetArrayLength(str_array);
+  if (length % 2 != 0) {
+    JniThrow("Can not map odd number of array elements to key/value pairs");
+  }
+  std::shared_ptr<arrow::Table> output_table;
+  for (int pos = 0; pos < length; pos++) {
+    auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    pos++;
+    auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    long memory_address = 0;
+    try {
+      memory_address = std::stol(JStringToCString(env, j_string_value));
+    } catch (...) {
+      JniThrow("Failed to parse memory address from string value");

Review Comment:
   Include exception message?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,50 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   default_memory_pool_id = -1L;
 }
 
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements 
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data.  This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader;
+  int length = env->GetArrayLength(str_array);
+  if (length % 2 != 0) {
+    JniThrow("Can not map odd number of array elements to key/value pairs");
+  }
+  std::shared_ptr<arrow::Table> output_table;
+  for (int pos = 0; pos < length; pos++) {
+    auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    pos++;
+    auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    long memory_address = 0;
+    try {
+      memory_address = std::stol(JStringToCString(env, j_string_value));
+    } catch (...) {
+      JniThrow("Failed to parse memory address from string value");
+    }
+    auto* arrow_stream_in = reinterpret_cast<ArrowArrayStream*>(memory_address);
+    std::shared_ptr<arrow::RecordBatchReader> readerIn = JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream_in));
+    output_table = JniGetOrThrow(readerIn->ToTable());
+    map_table_to_record_batch_reader[JStringToCString(env, j_string_key)] = output_table;
+  }
+  return map_table_to_record_batch_reader;
+}
+
+/// Find the arrow Table associated with a given table name
+std::shared_ptr<arrow::Table> GetTableByName(const std::vector<std::string>& names,
+    std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader) {
+  std::shared_ptr<arrow::Table> output_table;
+  for (const auto& name : names) {
+    output_table = map_table_to_reader[name];
+    if (output_table == nullptr) {
+      JniThrow("Table name " + name + " is needed to execute the Substrait plan");
+    }
+  }
+  return output_table;
+}

Review Comment:
   This still isn't correct. See earlier comments. I believe the test code in the repo is also incorrect.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.
+   *
+   * @param plan The JSON Substrait plan.
+
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.

Review Comment:
   This sentence doesn't apply



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,50 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   default_memory_pool_id = -1L;
 }
 
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements 
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data.  This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_record_batch_reader;
+  int length = env->GetArrayLength(str_array);
+  if (length % 2 != 0) {
+    JniThrow("Can not map odd number of array elements to key/value pairs");
+  }
+  std::shared_ptr<arrow::Table> output_table;
+  for (int pos = 0; pos < length; pos++) {
+    auto j_string_key = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    pos++;
+    auto j_string_value = reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    long memory_address = 0;

Review Comment:
   This should be uintptr_t



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.
+   *
+   * @param plan The JSON Substrait plan.
+
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.

Review Comment:
   ```suggestion
   ```



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.Types;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected RootAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name VARCHAR(150));" "SELECT id, name FROM Users"
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, "c");
+    try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+        .runQuery(
+            planReplaceLocalFileURI(
+                new String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+                    .getResource("substrait/local_files_users.json").toURI()))),
+                writeSupport.getOutputURI()
+            ),
+            Collections.EMPTY_MAP
+        )
+    ) {
+      while (arrowReader.loadNextBatch()) {

Review Comment:
   `rowcount` -> `rowCount`
   
   this needs to _sum_ the row counts



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.

Review Comment:
   Don't think this applies.



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.
+   *
+   * @param plan The JSON Substrait plan.
+
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches.
+   * Needed to define a mapping name of Tables and theirs ArrowReader representation.

Review Comment:
   Don't think this sentence makes sense. It's documented in the parameters already.



##########
docs/source/java/substrait.rst:
##########
@@ -0,0 +1,107 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+=========
+Substrait
+=========
+
+The ``arrow-dataset`` module can execute Substrait_ plans via the :doc:`Acero <../cpp/streaming_execution>`
+query engine.
+
+Executing Substrait Plans
+=========================
+
+Plans can reference data in files via URIs, or "named tables" that must be provided along with the plan.
+
+Here is an example of a Java program that queries a Parquet file using Java Substrait
+(this example use `Java Substrait`_ project to compile a SQL query to a Substrait plan):

Review Comment:
   The project calls itself "Substrait Java" not "Java Substrait"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org