You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by li...@apache.org on 2023/06/05 19:04:43 UTC

[arrow-cookbook] branch main updated: GH-309: [Java] Initial Substrait Plan documentation (Query Dataset) (#310)

This is an automated email from the ASF dual-hosted git repository.

lidavidm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-cookbook.git


The following commit(s) were added to refs/heads/main by this push:
     new 30228bf  GH-309: [Java] Initial Substrait Plan documentation (Query Dataset) (#310)
30228bf is described below

commit 30228bf15dc1fa329e1cf25334e1f735cfc9ea0f
Author: david dali susanibar arce <da...@gmail.com>
AuthorDate: Mon Jun 5 14:04:37 2023 -0500

    GH-309: [Java] Initial Substrait Plan documentation (Query Dataset) (#310)
    
    Co-authored-by: David Li <li...@gmail.com>
---
 java/source/demo/pom.xml                  |  10 ++
 java/source/flight.rst                    |   4 +-
 java/source/index.rst                     |   1 +
 java/source/substrait.rst                 | 206 ++++++++++++++++++++++++++++++
 java/thirdpartydeps/tpch/customer.parquet | Bin 0 -> 1270691 bytes
 java/thirdpartydeps/tpch/nation.parquet   | Bin 0 -> 2319 bytes
 6 files changed, 219 insertions(+), 2 deletions(-)

diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml
index a2bee04..376db3a 100644
--- a/java/source/demo/pom.xml
+++ b/java/source/demo/pom.xml
@@ -97,5 +97,15 @@
             <artifactId>guava</artifactId>
             <version>30.1.1-jre</version>
         </dependency>
+        <dependency>
+            <groupId>io.substrait</groupId>
+            <artifactId>isthmus</artifactId>
+            <version>0.11.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.substrait</groupId>
+            <artifactId>core</artifactId>
+            <version>0.11.0</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/java/source/flight.rst b/java/source/flight.rst
index 5e18167..53017b2 100644
--- a/java/source/flight.rst
+++ b/java/source/flight.rst
@@ -287,7 +287,7 @@ Flight Client and Server
    S1: Server (Location): Listening on port 33333
    C1: Client (Location): Connected to grpc+tcp://0.0.0.0:33333
    C2: Client (Populate Data): Wrote 2 batches with 3 rows each
-   C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6}
+   C3: Client (Get Metadata): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false}
    C4: Client (Get Stream):
    Client Received batch #1, Data:
    name
@@ -299,7 +299,7 @@ Flight Client and Server
    Manuel
    Felipe
    JJ
-   C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6}
+   C5: Client (List Flights Info): FlightInfo{schema=Schema<name: Utf8>, descriptor=profiles, endpoints=[FlightEndpoint{locations=[Location{uri=grpc+tcp://0.0.0.0:33333}], ticket=org.apache.arrow.flight.Ticket@58871b0a}], bytes=-1, records=6, ordered=false}
    C6: Client (Do Delete Action): Delete completed
    C7: Client (List Flights Info): After delete - No records
    C8: Server shut down successfully
diff --git a/java/source/index.rst b/java/source/index.rst
index 1ae98bb..63f94c0 100644
--- a/java/source/index.rst
+++ b/java/source/index.rst
@@ -39,6 +39,7 @@ This cookbook is tested with Apache Arrow |version|.
    io
    flight
    dataset
+   substrait
    data
    avro
    jdbc
diff --git a/java/source/substrait.rst b/java/source/substrait.rst
new file mode 100644
index 0000000..ee87371
--- /dev/null
+++ b/java/source/substrait.rst
@@ -0,0 +1,206 @@
+.. Licensed to the Apache Software Foundation (ASF) under one
+.. or more contributor license agreements.  See the NOTICE file
+.. distributed with this work for additional information
+.. regarding copyright ownership.  The ASF licenses this file
+.. to you under the Apache License, Version 2.0 (the
+.. "License"); you may not use this file except in compliance
+.. with the License.  You may obtain a copy of the License at
+
+..   http://www.apache.org/licenses/LICENSE-2.0
+
+.. Unless required by applicable law or agreed to in writing,
+.. software distributed under the License is distributed on an
+.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+.. KIND, either express or implied.  See the License for the
+.. specific language governing permissions and limitations
+.. under the License.
+
+.. _arrow-substrait:
+
+=========
+Substrait
+=========
+
+Arrow can use `Substrait`_ to integrate with other languages.
+
+.. contents::
+
+Querying Datasets
+=================
+
+The Substrait support in Arrow combines :doc:`Dataset <dataset>` and
+`substrait-java`_ to query datasets using `Acero`_ as a backend.
+
+Acero currently supports:
+
+- Reading Arrow, CSV, ORC, and Parquet files
+- Filters
+- Projections
+- Joins
+- Aggregates
+
+Here is an example of a Java program that queries a Parquet file:
+
+.. testcode::
+
+    import com.google.common.collect.ImmutableList;
+    import io.substrait.isthmus.SqlToSubstrait;
+    import io.substrait.proto.Plan;
+    import org.apache.arrow.dataset.file.FileFormat;
+    import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+    import org.apache.arrow.dataset.jni.NativeMemoryPool;
+    import org.apache.arrow.dataset.scanner.ScanOptions;
+    import org.apache.arrow.dataset.scanner.Scanner;
+    import org.apache.arrow.dataset.source.Dataset;
+    import org.apache.arrow.dataset.source.DatasetFactory;
+    import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.ipc.ArrowReader;
+    import org.apache.calcite.sql.parser.SqlParseException;
+
+    import java.nio.ByteBuffer;
+    import java.util.HashMap;
+    import java.util.Map;
+
+    static Plan queryTableNation() throws SqlParseException {
+       String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17";
+       String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " +
+               "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+       SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+       Plan plan = sqlToSubstrait.execute(sql, ImmutableList.of(nation));
+       return plan;
+    }
+
+    static void queryDatasetThruSubstraitPlanDefinition() {
+       String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
+       ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+       try (
+           BufferAllocator allocator = new RootAllocator();
+           DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(),
+                   FileFormat.PARQUET, uri);
+           Dataset dataset = datasetFactory.finish();
+           Scanner scanner = dataset.newScan(options);
+           ArrowReader reader = scanner.scanBatches()
+       ) {
+           Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+           mapTableToArrowReader.put("NATION", reader);
+           // get binary plan
+           Plan plan = queryTableNation();
+           ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length);
+           substraitPlan.put(plan.toByteArray());
+           // run query
+           try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery(
+               substraitPlan,
+               mapTableToArrowReader
+           )) {
+               while (arrowReader.loadNextBatch()) {
+                   System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+               }
+           }
+       } catch (Exception e) {
+           e.printStackTrace();
+       }
+    }
+
+    queryDatasetThruSubstraitPlanDefinition();
+
+.. testoutput::
+
+    N_NATIONKEY    N_NAME    N_REGIONKEY    N_COMMENT
+    17    PERU    1    platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun
+
+It is also possible to query multiple datasets and join them based on some criteria.
+For example, we can join the nation and customer tables from the TPC-H benchmark:
+
+.. testcode::
+
+    import com.google.common.collect.ImmutableList;
+    import io.substrait.isthmus.SqlToSubstrait;
+    import io.substrait.proto.Plan;
+    import org.apache.arrow.dataset.file.FileFormat;
+    import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+    import org.apache.arrow.dataset.jni.NativeMemoryPool;
+    import org.apache.arrow.dataset.scanner.ScanOptions;
+    import org.apache.arrow.dataset.scanner.Scanner;
+    import org.apache.arrow.dataset.source.Dataset;
+    import org.apache.arrow.dataset.source.DatasetFactory;
+    import org.apache.arrow.dataset.substrait.AceroSubstraitConsumer;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.ipc.ArrowReader;
+    import org.apache.calcite.sql.parser.SqlParseException;
+
+    import java.nio.ByteBuffer;
+    import java.util.HashMap;
+    import java.util.Map;
+
+    static Plan queryTableNationJoinCustomer() throws SqlParseException {
+        String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " +
+            "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " +
+            "GROUP BY n.n_name";
+        String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " +
+            "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))";
+        String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " +
+            "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " +
+            "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " +
+            "C_COMMENT VARCHAR(117) )";
+        SqlToSubstrait sqlToSubstrait = new SqlToSubstrait();
+        Plan plan = sqlToSubstrait.execute(sql,
+            ImmutableList.of(nation, customer));
+        return plan;
+    }
+
+    static void queryTwoDatasetsThruSubstraitPlanDefinition() {
+        String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet";
+        String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet";
+        ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+        try (
+            BufferAllocator allocator = new RootAllocator();
+            DatasetFactory datasetFactory = new FileSystemDatasetFactory(
+                allocator, NativeMemoryPool.getDefault(),
+                FileFormat.PARQUET, uriNation);
+            Dataset dataset = datasetFactory.finish();
+            Scanner scanner = dataset.newScan(options);
+            ArrowReader readerNation = scanner.scanBatches();
+            DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory(
+                allocator, NativeMemoryPool.getDefault(),
+                FileFormat.PARQUET, uriCustomer);
+            Dataset datasetCustomer = datasetFactoryCustomer.finish();
+            Scanner scannerCustomer = datasetCustomer.newScan(options);
+            ArrowReader readerCustomer = scannerCustomer.scanBatches()
+        ) {
+            // map table to reader
+            Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+            mapTableToArrowReader.put("NATION", readerNation);
+            mapTableToArrowReader.put("CUSTOMER", readerCustomer);
+            // get binary plan
+            Plan plan = queryTableNationJoinCustomer();
+            ByteBuffer substraitPlan = ByteBuffer.allocateDirect(
+                plan.toByteArray().length);
+            substraitPlan.put(plan.toByteArray());
+            // run query
+            try (ArrowReader arrowReader = new AceroSubstraitConsumer(
+                allocator).runQuery(
+                substraitPlan,
+                mapTableToArrowReader
+            )) {
+                while (arrowReader.loadNextBatch()) {
+                    System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString());
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    queryTwoDatasetsThruSubstraitPlanDefinition();
+
+.. testoutput::
+
+    N_NAME    NUMBER_CUSTOMER
+    PERU    573
+
+.. _`Substrait`: https://substrait.io/
+.. _`substrait-java`: https://github.com/substrait-io/substrait-java
+.. _`Acero`: https://arrow.apache.org/docs/cpp/streaming_execution.html
\ No newline at end of file
diff --git a/java/thirdpartydeps/tpch/customer.parquet b/java/thirdpartydeps/tpch/customer.parquet
new file mode 100644
index 0000000..ba017f2
Binary files /dev/null and b/java/thirdpartydeps/tpch/customer.parquet differ
diff --git a/java/thirdpartydeps/tpch/nation.parquet b/java/thirdpartydeps/tpch/nation.parquet
new file mode 100644
index 0000000..0189118
Binary files /dev/null and b/java/thirdpartydeps/tpch/nation.parquet differ