You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "davisusanibar (via GitHub)" <gi...@apache.org> on 2023/06/05 18:15:59 UTC

[GitHub] [arrow] davisusanibar commented on a diff in pull request #35570: GH-34252: [Java] Support ScannerBuilder::Project or ScannerBuilder::Filter as a Substrait proto extended expression

davisusanibar commented on code in PR #35570:
URL: https://github.com/apache/arrow/pull/35570#discussion_r1218428424


##########
docs/source/java/substrait.rst:
##########
@@ -102,6 +104,335 @@ Here is an example of a Java program that queries a Parquet file using Java Subs
     0	ALGERIA	0	 haggle. carefully final deposits detect slyly agai
     1	ARGENTINA	1	al foxes promise slyly according to the regular accounts. bold requests alon
 
+Executing Projections and Filters Using Extended Expressions
+============================================================
+
+Using `Extended Expression`_ we could leverage our current Dataset operations to
+also support Projections and Filters by. To gain access to Projections and Filters
+is needed to define that operations using current Extended Expression Java POJO
+classes defined into `Substrait Java`_ project.
+
+Here is an example of a Java program that queries a Parquet file to project new
+columns and also filter then based on Extended Expression definitions. This example
+show us:
+
+- Load TPCH parquet file Nation.parquet.
+- Produce new Projections and apply Filter into dataset using extended expression definition.
+    - Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 || ' - ' || col 3.
+    - Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10.
+    - Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18.
+
+.. code-block:: Java
+
+    import java.nio.ByteBuffer;
+    import java.util.ArrayList;
+    import java.util.Arrays;
+    import java.util.Base64;
+    import java.util.HashMap;
+    import java.util.List;
+    import java.util.Optional;
+
+    import org.apache.arrow.dataset.file.FileFormat;
+    import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+    import org.apache.arrow.dataset.jni.NativeMemoryPool;
+    import org.apache.arrow.dataset.scanner.ScanOptions;
+    import org.apache.arrow.dataset.scanner.Scanner;
+    import org.apache.arrow.dataset.source.Dataset;
+    import org.apache.arrow.dataset.source.DatasetFactory;
+    import org.apache.arrow.memory.BufferAllocator;
+    import org.apache.arrow.memory.RootAllocator;
+    import org.apache.arrow.vector.ipc.ArrowReader;
+
+    import com.google.protobuf.InvalidProtocolBufferException;
+    import com.google.protobuf.util.JsonFormat;
+
+    import io.substrait.proto.Expression;
+    import io.substrait.proto.ExpressionReference;
+    import io.substrait.proto.ExtendedExpression;
+    import io.substrait.proto.FunctionArgument;
+    import io.substrait.proto.SimpleExtensionDeclaration;
+    import io.substrait.proto.SimpleExtensionURI;
+    import io.substrait.type.NamedStruct;
+    import io.substrait.type.Type;
+    import io.substrait.type.TypeCreator;
+    import io.substrait.type.proto.TypeProtoConverter;
+
+    public class ClientSubstraitExtendedExpressions {
+      public static void main(String[] args) throws Exception {
+        // create extended expression for: project two new columns + one filter
+        String binaryExtendedExpressions = createExtendedExpresionMessageUsingPOJOClasses();
+        // project and filter dataset using extended expression definition - 03 Expressions:
+        // Expression 01 - CONCAT: N_NAME || ' - ' || N_COMMENT = col 1 || ' - ' || col 3
+        // Expression 02 - ADD: N_REGIONKEY + 10 = col 1 + 10
+        // Expression 03 - FILTER: N_NATIONKEY > 18 = col 3 > 18
+        projectAndFilterDataset(binaryExtendedExpressions);
+      }
+
+      public static void projectAndFilterDataset(String binaryExtendedExpressions) {
+        String uri = "file:///data/tpch_parquet/nation.parquet";
+        byte[] extendedExpressions = Base64.getDecoder().decode(
+            binaryExtendedExpressions);
+        ByteBuffer substraitExtendedExpressions = ByteBuffer.allocateDirect(
+            extendedExpressions.length);
+        substraitExtendedExpressions.put(extendedExpressions);
+        ScanOptions options = new ScanOptions(/*batchSize*/ 32768,
+            Optional.empty(),
+            Optional.of(substraitExtendedExpressions));
+        try (
+            BufferAllocator allocator = new RootAllocator();
+            DatasetFactory datasetFactory = new FileSystemDatasetFactory(
+                allocator, NativeMemoryPool.getDefault(),
+                FileFormat.PARQUET, uri);
+            Dataset dataset = datasetFactory.finish();
+            Scanner scanner = dataset.newScan(options);
+            ArrowReader reader = scanner.scanBatches()
+        ) {
+          while (reader.loadNextBatch()) {
+            System.out.println(
+                reader.getVectorSchemaRoot().contentToTSVString());
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+
+      private static String createExtendedExpresionMessageUsingPOJOClasses() throws InvalidProtocolBufferException {
+        // Expression: N_REGIONKEY + 10 = col 3 + 10
+        Expression.Builder selectionBuilderProjectOne = Expression.newBuilder().
+            setSelection(
+                Expression.FieldReference.newBuilder().
+                    setDirectReference(
+                        Expression.ReferenceSegment.newBuilder().
+                            setStructField(
+                                Expression.ReferenceSegment.StructField.newBuilder().setField(
+                                    2)
+                            )
+                    )
+            );
+        Expression.Builder literalBuilderProjectOne = Expression.newBuilder()
+            .setLiteral(
+                Expression.Literal.newBuilder().setI32(10)
+            );
+        io.substrait.proto.Type outputProjectOne = TypeCreator.NULLABLE.I32.accept(
+            new TypeProtoConverter());
+        Expression.Builder expressionBuilderProjectOne = Expression.
+            newBuilder().
+            setScalarFunction(
+                Expression.
+                    ScalarFunction.
+                    newBuilder().
+                    setFunctionReference(0).
+                    setOutputType(outputProjectOne).
+                    addArguments(
+                        0,
+                        FunctionArgument.newBuilder().setValue(
+                            selectionBuilderProjectOne)
+                    ).
+                    addArguments(
+                        1,
+                        FunctionArgument.newBuilder().setValue(
+                            literalBuilderProjectOne)
+                    )
+            );
+        ExpressionReference.Builder expressionReferenceBuilderProjectOne = ExpressionReference.newBuilder().
+            setExpression(expressionBuilderProjectOne)
+            .addOutputNames("ADD_TEN_TO_COLUMN_N_REGIONKEY");
+
+        // Expression: name || name = N_NAME || "-" || N_COMMENT = col 1 || col 3
+        Expression.Builder selectionBuilderProjectTwo = Expression.newBuilder().
+            setSelection(
+                Expression.FieldReference.newBuilder().
+                    setDirectReference(
+                        Expression.ReferenceSegment.newBuilder().
+                            setStructField(
+                                Expression.ReferenceSegment.StructField.newBuilder().setField(
+                                    1)
+                            )
+                    )
+            );
+        Expression.Builder selectionBuilderProjectTwoConcatLiteral = Expression.newBuilder()
+            .setLiteral(
+                Expression.Literal.newBuilder().setString(" - ")
+            );
+        Expression.Builder selectionBuilderProjectOneToConcat = Expression.newBuilder().
+            setSelection(
+                Expression.FieldReference.newBuilder().
+                    setDirectReference(
+                        Expression.ReferenceSegment.newBuilder().
+                            setStructField(
+                                Expression.ReferenceSegment.StructField.newBuilder().setField(
+                                    3)
+                            )
+                    )
+            );
+        io.substrait.proto.Type outputProjectTwo = TypeCreator.NULLABLE.STRING.accept(
+            new TypeProtoConverter());
+        Expression.Builder expressionBuilderProjectTwo = Expression.
+            newBuilder().
+            setScalarFunction(
+                Expression.
+                    ScalarFunction.
+                    newBuilder().
+                    setFunctionReference(1).
+                    setOutputType(outputProjectTwo).
+                    addArguments(
+                        0,
+                        FunctionArgument.newBuilder().setValue(
+                            selectionBuilderProjectTwo)
+                    ).
+                    addArguments(
+                        1,
+                        FunctionArgument.newBuilder().setValue(
+                            selectionBuilderProjectTwoConcatLiteral)
+                    ).
+                    addArguments(
+                        2,
+                        FunctionArgument.newBuilder().setValue(
+                            selectionBuilderProjectOneToConcat)
+                    )
+            );
+        ExpressionReference.Builder expressionReferenceBuilderProjectTwo = ExpressionReference.newBuilder().
+            setExpression(expressionBuilderProjectTwo)
+            .addOutputNames("CONCAT_COLUMNS_N_NAME_AND_N_COMMENT");
+
+        // Expression: Filter: N_NATIONKEY > 18 = col 1 > 18
+        Expression.Builder selectionBuilderFilterOne = Expression.newBuilder().
+            setSelection(
+                Expression.FieldReference.newBuilder().
+                    setDirectReference(
+                        Expression.ReferenceSegment.newBuilder().
+                            setStructField(
+                                Expression.ReferenceSegment.StructField.newBuilder().setField(
+                                    0)
+                            )
+                    )
+            );
+        Expression.Builder literalBuilderFilterOne = Expression.newBuilder()
+            .setLiteral(
+                Expression.Literal.newBuilder().setI32(18)
+            );
+        io.substrait.proto.Type outputFilterOne = TypeCreator.NULLABLE.BOOLEAN.accept(
+            new TypeProtoConverter());
+        Expression.Builder expressionBuilderFilterOne = Expression.
+            newBuilder().
+            setScalarFunction(
+                Expression.
+                    ScalarFunction.
+                    newBuilder().
+                    setFunctionReference(2).
+                    setOutputType(outputFilterOne).
+                    addArguments(
+                        0,
+                        FunctionArgument.newBuilder().setValue(
+                            selectionBuilderFilterOne)
+                    ).
+                    addArguments(
+                        1,
+                        FunctionArgument.newBuilder().setValue(
+                            literalBuilderFilterOne)
+                    )
+            );
+        ExpressionReference.Builder expressionReferenceBuilderFilterOne = ExpressionReference.newBuilder().
+            setExpression(expressionBuilderFilterOne)
+            .addOutputNames("COLUMN_N_NATIONKEY_GREATER_THAN_18");
+
+        List<String> columnNames = Arrays.asList("N_NATIONKEY", "N_NAME",
+            "N_REGIONKEY", "N_COMMENT");
+        List<Type> dataTypes = Arrays.asList(
+            TypeCreator.NULLABLE.I32,
+            TypeCreator.NULLABLE.STRING,
+            TypeCreator.NULLABLE.I32,
+            TypeCreator.NULLABLE.STRING
+        );
+        //
+        NamedStruct of = NamedStruct.of(
+            columnNames,
+            Type.Struct.builder().fields(dataTypes).nullable(false).build()
+        );
+
+        // Extensions URI
+        HashMap<String, SimpleExtensionURI> extensionUris = new HashMap<>();
+        extensionUris.put(
+            "key-001",
+            SimpleExtensionURI.newBuilder()
+                .setExtensionUriAnchor(1)
+                .setUri("/functions_arithmetic.yaml")
+                .build()
+        );
+        extensionUris.put(
+            "key-002",
+            SimpleExtensionURI.newBuilder()
+                .setExtensionUriAnchor(2)
+                .setUri("/functions_comparison.yaml")
+                .build()
+        );
+
+        // Extensions
+        ArrayList<SimpleExtensionDeclaration> extensions = new ArrayList<>();
+        SimpleExtensionDeclaration extensionFunctionAdd = SimpleExtensionDeclaration.newBuilder()
+            .setExtensionFunction(
+                SimpleExtensionDeclaration.ExtensionFunction.newBuilder()
+                    .setFunctionAnchor(0)
+                    .setName("add:i32_i32")
+                    .setExtensionUriReference(1))
+            .build();
+        SimpleExtensionDeclaration extensionFunctionGreaterThan = SimpleExtensionDeclaration.newBuilder()
+            .setExtensionFunction(
+                SimpleExtensionDeclaration.ExtensionFunction.newBuilder()
+                    .setFunctionAnchor(1)
+                    .setName("concat:vchar")
+                    .setExtensionUriReference(2))
+            .build();
+        SimpleExtensionDeclaration extensionFunctionLowerThan = SimpleExtensionDeclaration.newBuilder()
+            .setExtensionFunction(
+                SimpleExtensionDeclaration.ExtensionFunction.newBuilder()
+                    .setFunctionAnchor(2)
+                    .setName("gt:any_any")
+                    .setExtensionUriReference(2))
+            .build();
+        extensions.add(extensionFunctionAdd);
+        extensions.add(extensionFunctionGreaterThan);
+        extensions.add(extensionFunctionLowerThan);
+
+        // Extended Expression
+        ExtendedExpression.Builder extendedExpressionBuilder =
+            ExtendedExpression.newBuilder().
+                addReferredExpr(0,
+                    expressionReferenceBuilderProjectOne).
+                addReferredExpr(1,
+                    expressionReferenceBuilderProjectTwo).
+                addReferredExpr(2,
+                    expressionReferenceBuilderFilterOne).
+                setBaseSchema(of.toProto());
+        extendedExpressionBuilder.addAllExtensionUris(extensionUris.values());
+        extendedExpressionBuilder.addAllExtensions(extensions);
+
+        ExtendedExpression extendedExpression = extendedExpressionBuilder.build();

Review Comment:
   The plan is to be aligned with last changes creating a java cookbook to testing this code snippet after the merge of this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org