You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/26 20:37:54 UTC

[GitHub] [beam] ibzib opened a new pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

ibzib opened a new pull request #13200:
URL: https://github.com/apache/beam/pull/13200


   Adds jar loading to https://github.com/apache/beam/pull/12398.
   
   UDFs are specified by providing a jar to ZetaSQL options; support for
   SqlTransform::registerUd(a)?f will be added in a later change.
   
   R: @amaliujia 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521577848



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION matches(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT matches(\"a\", \"a\"), 'apple'='beta'",

Review comment:
       Yes your are right. This is an expected behavior as this is just an initial PR to achieve "execute Java UDF but might still have places to improve". As a follow up work I will check the mixed usage of UDF and non-UDF case and reject it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521575672



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION matches(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT matches(\"a\", \"a\"), 'apple'='beta'",

Review comment:
       So it does work with a blend of UDFs and non-UDF expressions? The `=` is executed by Calcite SQL expression evaluator? (the PR is big enough that I am skimming things and looking at tests)

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/ScalarFn.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+
+/**
+ * A scalar function that can be executed as part of a SQL query. Subclasses must contain exactly
+ * one method annotated with {@link ApplyMethod}, which will be applied to the SQL function

Review comment:
       I guess you can't use lambdas after all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512952689



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
##########
@@ -184,7 +190,9 @@ private BeamRelNode convertToBeamRelInternal(String sql, QueryParameters queryPa
     RelMetadataQuery.THREAD_PROVIDERS.set(
         JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
     root.rel.getCluster().invalidateMetadataQuery();
-    return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
+    BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
+    LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));

Review comment:
       I saw we do the same in the Calcite planner: https://github.com/apache/beam/blob/a443fdd4c2bd48c666478168cd1edb4db661103d/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java#L190
   
   Should we downgrade both to the `debug` level?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r514532639



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -116,6 +139,51 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in {@code statement}. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        return createFunctionStmt.getIsAggregate()
+            ? USER_DEFINED_JAVA_AGGREGATE_FUNCTIONS
+            : USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Native SQL aggregate functions are not supported (BEAM-9954).");
+        }
+        return USER_DEFINED_FUNCTIONS;
+      case "PY":
+      case "PYTHON":
+      case "JS":
+      case "JAVASCRIPT":
+        throw new UnsupportedOperationException(
+            String.format(
+                "Function %s uses unsupported language %s.",
+                String.join(".", createFunctionStmt.getNamePath()),
+                createFunctionStmt.getLanguage()));
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Function %s uses unrecognized language %s.",
+                String.join(".", createFunctionStmt.getNamePath()),
+                createFunctionStmt.getLanguage()));
+    }
+  }
+
+  private Function createFunction(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return new Function(
+        createFunctionStmt.getNamePath(),
+        getFunctionGroup(createFunctionStmt),
+        createFunctionStmt.getIsAggregate() ? Mode.AGGREGATE : Mode.SCALAR,

Review comment:
       This method is for `ResolvedCreateFunctionStmt`. `ResolvedCreateTableFunctionStmt` is a separate class altogether.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] apilloud commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
apilloud commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521602604



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -116,6 +144,51 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in {@code statement}. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {

Review comment:
       nit: This is unused?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+/** {@link ConverterRule} to replace {@link Calc} with {@link BeamCalcRel}. */
+public class BeamJavaUdfCalcRule extends ConverterRule {
+  public static final BeamJavaUdfCalcRule INSTANCE = new BeamJavaUdfCalcRule();
+
+  private BeamJavaUdfCalcRule() {
+    super(
+        LogicalCalc.class, Convention.NONE, BeamLogicalConvention.INSTANCE, "BeamJavaUdfCalcRule");
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall x) {
+    return ZetaSQLQueryPlanner.hasUdfInProjects(x);

Review comment:
       This needs to reject non-udf operations in the projects, otherwise you might end up running ZetaSQL operators through `BeamCalcRel` which currently has no coverage in compliance testing and will introduce data corruption bugs. (If you want a more expansive set of operators we need to setup the compliance tests to cover them first.)

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -116,6 +144,51 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in {@code statement}. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        return createFunctionStmt.getIsAggregate()
+            ? USER_DEFINED_JAVA_AGGREGATE_FUNCTIONS
+            : USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":

Review comment:
       This field being unset is `SQL` in BigQuery. Should we match?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
##########
@@ -97,6 +109,30 @@ public QueryPlanner createPlanner(
     return modifyRuleSetsForZetaSql(BeamRuleSets.getRuleSets());
   }
 
+  /** Returns true if the argument contains any user-defined Java functions. */
+  static boolean hasUdfInProjects(RelOptRuleCall x) {
+    List<RelNode> resList = x.getRelList();
+    for (RelNode relNode : resList) {
+      if (relNode instanceof LogicalCalc) {
+        LogicalCalc logicalCalc = (LogicalCalc) relNode;
+        for (RexNode rexNode : logicalCalc.getProgram().getExprList()) {
+          if (rexNode instanceof RexCall) {
+            RexCall call = (RexCall) rexNode;
+            if (call.getOperator() instanceof SqlUserDefinedFunction) {
+              SqlUserDefinedFunction udf = (SqlUserDefinedFunction) call.op;
+              if (udf.function instanceof ZetaSqlScalarFunctionImpl) {
+                ZetaSqlScalarFunctionImpl scalarFunction = (ZetaSqlScalarFunctionImpl) udf.function;
+                return scalarFunction.functionGroup.equals(

Review comment:
       Nit: put the constant first to reduce risk of NPE. USER_DEFINED_JAVA_SCALAR_FUNCTIONS.equals(...)

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -116,6 +144,51 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in {@code statement}. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        return createFunctionStmt.getIsAggregate()
+            ? USER_DEFINED_JAVA_AGGREGATE_FUNCTIONS
+            : USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Native SQL aggregate functions are not supported (BEAM-9954).");
+        }
+        return USER_DEFINED_FUNCTIONS;
+      case "PY":
+      case "PYTHON":
+      case "JS":
+      case "JAVASCRIPT":

Review comment:
       nit: How does this list compare with BigQuery? Do we need to reserve these before we implement them?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -232,4 +265,8 @@ private AggregateCall convertAggCall(
     return AggregateCall.create(
         sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY, returnType, aggName);
   }
+
+  private static RelDataTypeFactory createTypeFactory() {

Review comment:
       nit: Nothing uses this?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -198,14 +204,41 @@ private AggregateCall convertAggCall(
               + " aggregation.");
     }
 
-    SqlAggFunction sqlAggFunction =
-        (SqlAggFunction)
-            SqlOperatorMappingTable.ZETASQL_FUNCTION_TO_CALCITE_SQL_OPERATOR.get(
-                aggregateFunctionCall.getFunction().getName());
-    if (sqlAggFunction == null) {
-      throw new UnsupportedOperationException(
-          "Does not support ZetaSQL aggregate function: "
-              + aggregateFunctionCall.getFunction().getName());
+    SqlAggFunction sqlAggFunction;
+    if (aggregateFunctionCall

Review comment:
       nit: USER_DEFINED_JAVA_AGGREGATE_FUNCTIONS.equals(...)

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -644,7 +649,7 @@ private RexNode convertResolvedFunctionCall(
           throw new UnsupportedOperationException(
               "Unsupported function: " + funName + ". Only support TUMBLE, HOP, and SESSION now.");
       }
-    } else if (funGroup.equals("ZetaSQL")) {
+    } else if (funGroup.equals(ZETASQL_FUNCTION_GROUP_NAME)) {

Review comment:
       nit: ZETASQL_FUNCTION_GROUP_NAME.equals(funGroup) (There are a bunch more of this pattern, I'm going to stop commenting on them.)

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -98,22 +102,36 @@ public RelRoot rel(String sql, QueryParameters params) {
     SimpleCatalog catalog =
         analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options, tables);
 
-    ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
+        ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, ScalarFn> javaScalarFunctionBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, AggregateFn> javaAggregateFunctionBuilder =
+        ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        // ResolvedCreateFunctionStmt does not include the full function name, so build it here.
-        String functionFullName =
-            String.format(
-                "%s:%s",
-                SqlAnalyzer.USER_DEFINED_FUNCTIONS,
-                String.join(".", createFunctionStmt.getNamePath()));
-        udfBuilder.put(functionFullName, createFunctionStmt);
+        if (SqlAnalyzer.getFunctionGroup(createFunctionStmt)

Review comment:
       nit: Could this be a switch statement instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-717537691


   @ibzib  as this PR is lagging behind the master for 200+ commits. Maybe we could try a rebase then check failing precommit tests?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513097859



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql.translation;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.ProviderNotFoundException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.extensions.sql.UdfProvider;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads {@link UdfProvider} implementations from user-provided jars.
+ *
+ * <p>All UDFs are loaded and cached for each jar to mitigate IO costs.
+ */
+public class JavaUdfLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(JavaUdfLoader.class);
+
+  /**
+   * Maps the external jar location to the functions the jar defines. Static so it can persist
+   * across multiple SQL transforms.
+   */
+  private static final Map<String, UserFunctionDefinitions> cache = new HashMap<>();
+
+  private static ClassLoader originalClassLoader = null;
+
+  /**
+   * Load a user-defined scalar function from the specified jar.
+   *
+   * <p><strong>WARNING</strong>: The first time a jar is loaded, it is added to the thread's
+   * context {@link ClassLoader} so that the jar can be staged by the runner.
+   */
+  public Method loadScalarFunction(List<String> functionPath, String jarPath) {
+    String functionFullName = String.join(".", functionPath);
+    try {
+      UserFunctionDefinitions functionDefinitions = loadJar(jarPath);
+      if (!functionDefinitions.javaScalarFunctions.containsKey(functionPath)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "No implementation of scalar function %s found in %s.\n"
+                    + " 1. Create a class implementing %s and annotate it with @AutoService(%s.class).\n"
+                    + " 2. Add function %s to the class's userDefinedScalarFunctions implementation.",
+                functionFullName,
+                jarPath,
+                UdfProvider.class.getSimpleName(),
+                UdfProvider.class.getSimpleName(),
+                functionFullName));
+      }
+      return functionDefinitions.javaScalarFunctions.get(functionPath);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format(
+              "Failed to load user-defined scalar function %s from %s", functionFullName, jarPath),
+          e);
+    }
+  }
+
+  /**
+   * Load a user-defined aggregate function from the specified jar.
+   *
+   * <p><strong>WARNING</strong>: The first time a jar is loaded, it is added to the thread's
+   * context {@link ClassLoader} so that the jar can be staged by the runner.
+   */
+  public Combine.CombineFn loadAggregateFunction(List<String> functionPath, String jarPath) {
+    String functionFullName = String.join(".", functionPath);
+    try {
+      UserFunctionDefinitions functionDefinitions = loadJar(jarPath);
+      if (!functionDefinitions.javaAggregateFunctions.containsKey(functionPath)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "No implementation of aggregate function %s found in %s.\n"
+                    + " 1. Create a class implementing %s and annotate it with @AutoService(%s.class).\n"
+                    + " 2. Add function %s to the class's userDefinedAggregateFunctions implementation.",
+                functionFullName,
+                jarPath,
+                UdfProvider.class.getSimpleName(),
+                UdfProvider.class.getSimpleName(),
+                functionFullName));
+      }
+      return functionDefinitions.javaAggregateFunctions.get(functionPath);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format(
+              "Failed to load user-defined aggregate function %s from %s",
+              functionFullName, jarPath),
+          e);
+    }
+  }
+
+  private ClassLoader createAndSetClassLoader(String inputJarPath) throws IOException {
+    Preconditions.checkArgument(!inputJarPath.isEmpty(), "Jar path cannot be empty.");
+    ResourceId inputJar = FileSystems.matchNewResource(inputJarPath, false /* is directory */);
+    File tmpJar = File.createTempFile("sql-udf-", inputJar.getFilename());
+    FileSystems.copy(
+        Collections.singletonList(inputJar),
+        Collections.singletonList(
+            FileSystems.matchNewResource(tmpJar.getAbsolutePath(), false /* is directory */)));

Review comment:
       I logged the MD5 hash.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-716975150


   Thank you! Will take a look soon!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-725657503


   > This PR is to large to effectively review, you've already received approval from others but if you want a complete review I would suggest breaking it up into at least the following smaller separate changes:
   
   I will try to spin off parts of this PR, starting with #13304.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512943846



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +136,51 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in {@code statement}. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  private static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        return createFunctionStmt.getIsAggregate()
+            ? USER_DEFINED_JAVA_AGGREGATE_FUNCTIONS
+            : USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Native SQL aggregate functions are not supported (BEAM-9954).");
+        }
+        return USER_DEFINED_FUNCTIONS;
+      case "PY":
+      case "PYTHON":
+      case "JS":
+      case "JAVASCRIPT":

Review comment:
       Nice to have these in place!

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -82,8 +80,6 @@
           // Rules for window functions
           ProjectToWindowRule.PROJECT,
           // Rules so we only have to implement Calc
-          FilterCalcMergeRule.INSTANCE,
-          ProjectCalcMergeRule.INSTANCE,

Review comment:
       https://github.com/apache/beam/pull/12400/files I have done this part in ZetaSQL planner so you don't need to disable such rules in the default rulesets

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -97,22 +102,43 @@ public RelRoot rel(String sql, QueryParameters params) {
     SimpleCatalog catalog =
         analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options, tables);
 
-    ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
+        ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Method> javaScalarFunctionBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Combine.CombineFn> javaAggregateFunctionBuilder =
+        ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        // ResolvedCreateFunctionStmt does not include the full function name, so build it here.
-        String functionFullName =
-            String.format(
-                "%s:%s",
-                SqlAnalyzer.USER_DEFINED_FUNCTIONS,
-                String.join(".", createFunctionStmt.getNamePath()));
-        udfBuilder.put(functionFullName, createFunctionStmt);
+        if (createFunctionStmt.getLanguage().toUpperCase().equals("SQL")) {
+          udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+        } else if (createFunctionStmt.getLanguage().toUpperCase().equals("JAVA")) {

Review comment:
       Will it better to replace `"JAVA"` with a static constant or use a helper function `isJavaUDF(createFunctionStmt.getLanguage())`?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -97,22 +102,43 @@ public RelRoot rel(String sql, QueryParameters params) {
     SimpleCatalog catalog =
         analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options, tables);
 
-    ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
+        ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Method> javaScalarFunctionBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Combine.CombineFn> javaAggregateFunctionBuilder =
+        ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        // ResolvedCreateFunctionStmt does not include the full function name, so build it here.
-        String functionFullName =
-            String.format(
-                "%s:%s",
-                SqlAnalyzer.USER_DEFINED_FUNCTIONS,
-                String.join(".", createFunctionStmt.getNamePath()));
-        udfBuilder.put(functionFullName, createFunctionStmt);
+        if (createFunctionStmt.getLanguage().toUpperCase().equals("SQL")) {

Review comment:
       same for `"SQL"`

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -82,8 +80,6 @@
           // Rules for window functions
           ProjectToWindowRule.PROJECT,
           // Rules so we only have to implement Calc
-          FilterCalcMergeRule.INSTANCE,
-          ProjectCalcMergeRule.INSTANCE,

Review comment:
       Meanwhile, one rule is required for ZetaSQL planner is, we probably need a `BeamZetaSQLCalcMergeRule`, which does not merge Calc when one Calc has Java UDF but another Calc does not. This will be useful when users write query like 
   ```
   select java_udf(x) from (select builtin_function(y) as x from table) as t;
   ```

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
##########
@@ -184,7 +190,9 @@ private BeamRelNode convertToBeamRelInternal(String sql, QueryParameters queryPa
     RelMetadataQuery.THREAD_PROVIDERS.set(
         JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
     root.rel.getCluster().invalidateMetadataQuery();
-    return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
+    BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
+    LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));

Review comment:
       Remove this LOG.info before merge the code?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/JavaUdfLoader.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql.translation;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.ProviderNotFoundException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.beam.sdk.extensions.sql.UdfProvider;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Loads {@link UdfProvider} implementations from user-provided jars.
+ *
+ * <p>All UDFs are loaded and cached for each jar to mitigate IO costs.
+ */
+public class JavaUdfLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(JavaUdfLoader.class);
+
+  /**
+   * Maps the external jar location to the functions the jar defines. Static so it can persist
+   * across multiple SQL transforms.
+   */
+  private static final Map<String, UserFunctionDefinitions> cache = new HashMap<>();
+
+  private static ClassLoader originalClassLoader = null;
+
+  /**
+   * Load a user-defined scalar function from the specified jar.
+   *
+   * <p><strong>WARNING</strong>: The first time a jar is loaded, it is added to the thread's
+   * context {@link ClassLoader} so that the jar can be staged by the runner.
+   */
+  public Method loadScalarFunction(List<String> functionPath, String jarPath) {
+    String functionFullName = String.join(".", functionPath);
+    try {
+      UserFunctionDefinitions functionDefinitions = loadJar(jarPath);
+      if (!functionDefinitions.javaScalarFunctions.containsKey(functionPath)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "No implementation of scalar function %s found in %s.\n"
+                    + " 1. Create a class implementing %s and annotate it with @AutoService(%s.class).\n"
+                    + " 2. Add function %s to the class's userDefinedScalarFunctions implementation.",
+                functionFullName,
+                jarPath,
+                UdfProvider.class.getSimpleName(),
+                UdfProvider.class.getSimpleName(),
+                functionFullName));
+      }
+      return functionDefinitions.javaScalarFunctions.get(functionPath);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format(
+              "Failed to load user-defined scalar function %s from %s", functionFullName, jarPath),
+          e);
+    }
+  }
+
+  /**
+   * Load a user-defined aggregate function from the specified jar.
+   *
+   * <p><strong>WARNING</strong>: The first time a jar is loaded, it is added to the thread's
+   * context {@link ClassLoader} so that the jar can be staged by the runner.
+   */
+  public Combine.CombineFn loadAggregateFunction(List<String> functionPath, String jarPath) {
+    String functionFullName = String.join(".", functionPath);
+    try {
+      UserFunctionDefinitions functionDefinitions = loadJar(jarPath);
+      if (!functionDefinitions.javaAggregateFunctions.containsKey(functionPath)) {
+        throw new IllegalArgumentException(
+            String.format(
+                "No implementation of aggregate function %s found in %s.\n"
+                    + " 1. Create a class implementing %s and annotate it with @AutoService(%s.class).\n"
+                    + " 2. Add function %s to the class's userDefinedAggregateFunctions implementation.",
+                functionFullName,
+                jarPath,
+                UdfProvider.class.getSimpleName(),
+                UdfProvider.class.getSimpleName(),
+                functionFullName));
+      }
+      return functionDefinitions.javaAggregateFunctions.get(functionPath);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format(
+              "Failed to load user-defined aggregate function %s from %s",
+              functionFullName, jarPath),
+          e);
+    }
+  }
+
+  private ClassLoader createAndSetClassLoader(String inputJarPath) throws IOException {
+    Preconditions.checkArgument(!inputJarPath.isEmpty(), "Jar path cannot be empty.");
+    ResourceId inputJar = FileSystems.matchNewResource(inputJarPath, false /* is directory */);
+    File tmpJar = File.createTempFile("sql-udf-", inputJar.getFilename());
+    FileSystems.copy(
+        Collections.singletonList(inputJar),
+        Collections.singletonList(
+            FileSystems.matchNewResource(tmpJar.getAbsolutePath(), false /* is directory */)));

Review comment:
       Is it possible to have a verification after the copy is done? E.g. verify file size or MD5 hash? I am thinking if user query has an `function not found` issue due to a bad downloaded jar, we can locate the root cause based on log? Otherwise such issue might be hard to debug.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521641628



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/impl/ScalarFnImpl.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql.translation.impl;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.sdk.extensions.sql.ApplyMethod;
+import org.apache.beam.sdk.extensions.sql.ScalarFn;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
+
+/** Implementation logic for {@link ScalarFn}. */
+public class ScalarFnImpl {

Review comment:
       Self comment: this is unrelated to `ScalarFunctionImpl` and so should be renamed to avoid confusion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-717514966


   Those failed SQL precommits are about compilation error by Janino. For example:
   
   ```
   java.lang.UnsupportedOperationException: Could not compile CalcFn: {
     c.output(org.apache.beam.sdk.values.Row.withSchema(outputSchema).attachValues(java.util.Arrays.asList(org.apache.beam.sdk.extensions.sql.zetasql.translation.impl.TimestampFunctions.timestamp(1230219000000L))));
   }
   ```
   It is not obvious to me why this PR has failed such tests. Will take another look to see whether I can find the root cause.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512938754



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I can see why you think `Method` have too many things that might not relevant. Ideally we only need users to provide function signature (including function name) , which are enough to 1) offer required information to SQL parser 2) locate the right implementation in the user jar.
   
   
   The reason we still use `Method`, is because that current UDF implementation asks for  a `Method`:  [1] [2] [3]. So as we will need to construct a `Method` eventually, it doesn't seem a bad choice to expose it in the interface.   
   
   
   [1]: https://github.com/apache/beam/blob/5076255fda1700f0d3ac2a9e5e73372bdf3c59dd/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L96
   [2]: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L111
   [3]: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java#L195




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513097589



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -97,22 +102,43 @@ public RelRoot rel(String sql, QueryParameters params) {
     SimpleCatalog catalog =
         analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options, tables);
 
-    ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
+        ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Method> javaScalarFunctionBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Combine.CombineFn> javaAggregateFunctionBuilder =
+        ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        // ResolvedCreateFunctionStmt does not include the full function name, so build it here.
-        String functionFullName =
-            String.format(
-                "%s:%s",
-                SqlAnalyzer.USER_DEFINED_FUNCTIONS,
-                String.join(".", createFunctionStmt.getNamePath()));
-        udfBuilder.put(functionFullName, createFunctionStmt);
+        if (createFunctionStmt.getLanguage().toUpperCase().equals("SQL")) {
+          udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+        } else if (createFunctionStmt.getLanguage().toUpperCase().equals("JAVA")) {

Review comment:
       I exposed `SqlAnalyzer.getFunctionGroup` for this purpose.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -97,22 +102,43 @@ public RelRoot rel(String sql, QueryParameters params) {
     SimpleCatalog catalog =
         analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options, tables);
 
-    ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
+        ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Method> javaScalarFunctionBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, Combine.CombineFn> javaAggregateFunctionBuilder =
+        ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        // ResolvedCreateFunctionStmt does not include the full function name, so build it here.
-        String functionFullName =
-            String.format(
-                "%s:%s",
-                SqlAnalyzer.USER_DEFINED_FUNCTIONS,
-                String.join(".", createFunctionStmt.getNamePath()));
-        udfBuilder.put(functionFullName, createFunctionStmt);
+        if (createFunctionStmt.getLanguage().toUpperCase().equals("SQL")) {

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512942433



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -82,8 +80,6 @@
           // Rules for window functions
           ProjectToWindowRule.PROJECT,
           // Rules so we only have to implement Calc
-          FilterCalcMergeRule.INSTANCE,
-          ProjectCalcMergeRule.INSTANCE,

Review comment:
       Meanwhile, one rule is required for ZetaSQL planner is, we probably need a `BeamZetaSQLCalcMergeRule`, which does not merge Calc when one Calc has Java UDF but another Calc does not. This will be useful when users write query like 
   ```
   select java_udf(x) from (select builtin_function(y) as x from table) as t;
   
   Thus we can have a plan as
   JavaUDFRel
   ----ZetaSQLRel
   --------TableScan
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r514565092



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT foo();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION fun(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT fun(\"a\", \"a\"), 'apple'='beta'",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField =
+        Schema.builder().addBooleanField("field1").addBooleanField("field2").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues(true, false).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBuiltinAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION agg_fun(str STRING) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT agg_fun(Value) from KeyValue",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testCustomAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION custom_agg(f INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT custom_agg(f_int_1) from aggregate_test_table",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(0L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnregisteredFunction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION notRegistered() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT notRegistered();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage(
+        String.format("No implementation of scalar function notRegistered found in %s.", jarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJarContainsNoUdfProviders() {
+    assumeNotNull(jarPath);
+    assumeNotNull(emptyJarPath);
+    String sql =
+        String.format(
+            // Load an inhabited jar first so we can make sure jars load UdfProviders in isolation
+            // from other jars.
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "CREATE FUNCTION bar() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT bar();",
+            jarPath, emptyJarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(ProviderNotFoundException.class);
+    thrown.expectMessage(String.format("No UdfProvider implementation found in %s.", emptyJarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJavaUdfNoJarProvided() {
+    String sql = "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java; SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No jar was provided to define function foo.");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testPathOptionNotString() {
+    String sql =
+        "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path=23); SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Option 'path' has type TYPE_INT64 (expected TYPE_STRING).");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }

Review comment:
       I added some tests. I discovered a gap in type checking, since the current implementation does not strictly enforce that the SQL function signature must match the Java signature. I will fix that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-718252195


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-717715517


   @ibzib now I can see the failed tests better. 
   
   So if you check the error messages, for those failed test, they have been hitting BeamCalcRel. However, for such ZetaSQL function tests, they should hit BeamZetaSqlRel. So can you run one of the failed tests locally and observe the behavior of visiting `BeamJavaUdfRule` and `BeamZetaSqlRule`, and see why the `BeamJavaUdfRule` is matched thus BeamCalcRel is used?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521577848



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION matches(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT matches(\"a\", \"a\"), 'apple'='beta'",

Review comment:
       Yes your are right. This is an expected behavior as this is a big initial PR to achieve "execute Java UDF but might still have places to improve". As a follow up work I will check the mixed usage of UDF and non-UDF case and reject it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r514490438



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -116,6 +139,51 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in {@code statement}. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        return createFunctionStmt.getIsAggregate()
+            ? USER_DEFINED_JAVA_AGGREGATE_FUNCTIONS
+            : USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Native SQL aggregate functions are not supported (BEAM-9954).");
+        }
+        return USER_DEFINED_FUNCTIONS;
+      case "PY":
+      case "PYTHON":
+      case "JS":
+      case "JAVASCRIPT":
+        throw new UnsupportedOperationException(
+            String.format(
+                "Function %s uses unsupported language %s.",
+                String.join(".", createFunctionStmt.getNamePath()),
+                createFunctionStmt.getLanguage()));
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Function %s uses unrecognized language %s.",
+                String.join(".", createFunctionStmt.getNamePath()),
+                createFunctionStmt.getLanguage()));
+    }
+  }
+
+  private Function createFunction(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return new Function(
+        createFunctionStmt.getNamePath(),
+        getFunctionGroup(createFunctionStmt),
+        createFunctionStmt.getIsAggregate() ? Mode.AGGREGATE : Mode.SCALAR,

Review comment:
       There is the third case: table function. But it is ok for now to have a binary code here. Later table function handling can be added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r515554810



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+public class BeamJavaUdfCalcRule extends BeamZetaSqlCalcRuleBase {

Review comment:
       @amaliujia can you write a Javadoc for this? (after addressing Kenn's other comment on this file)

##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I created a custom interface `ScalarFn`, which uses annotation `@ApplyMethod` to locate the method to be applied. It adds a little more boilerplate (+2 SLOC / function), but now it's extensible.
   
   > But I would still like to see a test case demonstrating what it is like for a user to register a UDF using a lambda / anonymous function.
   
   One of the drawbacks of the annotation approach is that it is not compatible with lambdas. But we needed the flexibility; lambdas require a known/fixed number of arguments.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRuleBase.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTrait;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+public abstract class BeamZetaSqlCalcRuleBase extends ConverterRule {
+
+  public BeamZetaSqlCalcRuleBase(
+      Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String description) {
+    super(clazz, in, out, description);
+  }
+
+  protected boolean hasUdfInProjects(RelOptRuleCall x) {

Review comment:
       @amaliujia wrote this part so I'll let him comment.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -80,58 +81,59 @@
           new UdafImpl<>(new StringAgg.StringAggString()));
 
   public static final SqlOperator START_WITHS =
-      createUdfOperator("STARTS_WITH", BeamBuiltinMethods.STARTS_WITH_METHOD);
+      createBuiltinFunctionOperator("STARTS_WITH", BeamBuiltinMethods.STARTS_WITH_METHOD);
 
   public static final SqlOperator CONCAT =
-      createUdfOperator("CONCAT", BeamBuiltinMethods.CONCAT_METHOD);
+      createBuiltinFunctionOperator("CONCAT", BeamBuiltinMethods.CONCAT_METHOD);
 
   public static final SqlOperator REPLACE =
-      createUdfOperator("REPLACE", BeamBuiltinMethods.REPLACE_METHOD);
+      createBuiltinFunctionOperator("REPLACE", BeamBuiltinMethods.REPLACE_METHOD);
 
-  public static final SqlOperator TRIM = createUdfOperator("TRIM", BeamBuiltinMethods.TRIM_METHOD);
+  public static final SqlOperator TRIM =
+      createBuiltinFunctionOperator("TRIM", BeamBuiltinMethods.TRIM_METHOD);
 
   public static final SqlOperator LTRIM =
-      createUdfOperator("LTRIM", BeamBuiltinMethods.LTRIM_METHOD);
+      createBuiltinFunctionOperator("LTRIM", BeamBuiltinMethods.LTRIM_METHOD);
 
   public static final SqlOperator RTRIM =
-      createUdfOperator("RTRIM", BeamBuiltinMethods.RTRIM_METHOD);
+      createBuiltinFunctionOperator("RTRIM", BeamBuiltinMethods.RTRIM_METHOD);
 
   public static final SqlOperator SUBSTR =
-      createUdfOperator("SUBSTR", BeamBuiltinMethods.SUBSTR_METHOD);
+      createBuiltinFunctionOperator("SUBSTR", BeamBuiltinMethods.SUBSTR_METHOD);
 
   public static final SqlOperator REVERSE =
-      createUdfOperator("REVERSE", BeamBuiltinMethods.REVERSE_METHOD);
+      createBuiltinFunctionOperator("REVERSE", BeamBuiltinMethods.REVERSE_METHOD);
 
   public static final SqlOperator CHAR_LENGTH =
-      createUdfOperator("CHAR_LENGTH", BeamBuiltinMethods.CHAR_LENGTH_METHOD);
+      createBuiltinFunctionOperator("CHAR_LENGTH", BeamBuiltinMethods.CHAR_LENGTH_METHOD);
 
   public static final SqlOperator ENDS_WITH =
-      createUdfOperator("ENDS_WITH", BeamBuiltinMethods.ENDS_WITH_METHOD);
+      createBuiltinFunctionOperator("ENDS_WITH", BeamBuiltinMethods.ENDS_WITH_METHOD);
 
   public static final SqlOperator LIKE =
-      createUdfOperator("LIKE", BeamBuiltinMethods.LIKE_METHOD, SqlSyntax.BINARY);
+      createBuiltinFunctionOperator("LIKE", BeamBuiltinMethods.LIKE_METHOD, SqlSyntax.BINARY);
 
   public static final SqlOperator VALIDATE_TIMESTAMP =
-      createUdfOperator(
+      createBuiltinFunctionOperator(
           "validateTimestamp",
           DateTimeUtils.class,
           "validateTimestamp",
           x -> NULLABLE_TIMESTAMP,
           ImmutableList.of(TIMESTAMP));
 
   public static final SqlOperator VALIDATE_TIME_INTERVAL =
-      createUdfOperator(
+      createBuiltinFunctionOperator(
           "validateIntervalArgument",
           DateTimeUtils.class,
           "validateTimeInterval",
           x -> NULLABLE_BIGINT,
           ImmutableList.of(BIGINT, OTHER));
 
   public static final SqlOperator TIMESTAMP_OP =
-      createUdfOperator("TIMESTAMP", BeamBuiltinMethods.TIMESTAMP_METHOD);
+      createBuiltinFunctionOperator("TIMESTAMP", BeamBuiltinMethods.TIMESTAMP_METHOD);
 
   public static final SqlOperator DATE_OP =
-      createUdfOperator("DATE", BeamBuiltinMethods.DATE_METHOD);
+      createBuiltinFunctionOperator("DATE", BeamBuiltinMethods.DATE_METHOD);

Review comment:
       I removed `createBuiltinFunctionOperator` and required all function definitions to specify a function group.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+public class BeamJavaUdfCalcRule extends BeamZetaSqlCalcRuleBase {

Review comment:
       IIRC committers can commit directly to someone else's PR, so you can change it in this PR if you want.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+public class BeamJavaUdfCalcRule extends BeamZetaSqlCalcRuleBase {

Review comment:
       Alternatively, it seems we do not really need this subclass at all. There's no reason we can't just use ConverterRule directly.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+public class BeamJavaUdfCalcRule extends BeamZetaSqlCalcRuleBase {

Review comment:
       Actually this was more straightforward than I thought. So I did it myself. PTAL




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-722008588


   @kennknowles This is ready for another review now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513086568



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I don't agree. If it is a simple function, a method can always be used as in `SomeClass::methodName` and this is also easier for following control flow (including for IDEs). Whenever you can use explicit method references instead of reflection, you should.
   
   The converse is not true. If a function is provided, it cannot automatically be converted to `Method`. The class `Method` is specifically a rich representation of an aspect of a `Class` in the JVM. It is not a good class for use when you want some sort of function. It is not a good class for anything _except_ when you specifically want to manipulate the JVM's representation of a class.
   
   If we expand the interface, we can maintain backwards-compatibility with `default` method implementation. If we need to expand it in a way where this is not possible we will need to create a new interface. If we use `Method` it is not possible to expand the interface anyhow.
   
   I really think if you look at the pro/con of using `Method` vs a smaller interface there is not a single thing in the "pro" column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512938754



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I can see why you think `Method` have too many things that might not relevant. Ideally we only need users to provide function signature (including function name) , which are enough to 1) offer required information to SQL parser 2) locate the right implementation in the user jar.
   
   
   The reason we still use `Method`, is because that current UDF implementation asks for  a `Method`:  [1] [2]. So as we will need to construct a `Method` eventually, it doesn't seem a bad choice to expose it in the interface.   
   
   
   [1]: https://github.com/apache/beam/blob/5076255fda1700f0d3ac2a9e5e73372bdf3c59dd/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L96
   [2]: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L111




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513165094



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       @kennknowles Maybe we should have an idea of what smaller interface you are thinking of (in case we have been thinking different things)? Can you give an example?Then we can compare and see pros/cons? 
   
   We can go back to design doc to discuss this if you prefer to. Right now I cannot find the original comment you mentioned so starting a new comment is ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-718242175


   Overall LGTM. @kennknowles  do you have other comments except for the interface discussion?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521583311



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/ScalarFn.java
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+
+/**
+ * A scalar function that can be executed as part of a SQL query. Subclasses must contain exactly
+ * one method annotated with {@link ApplyMethod}, which will be applied to the SQL function

Review comment:
       Lambdas are not compatible with the annotation-based design.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512938754



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I can see why you think `Method` have too many things that might not relevant. Ideally we only need users to provide function name plus function signature, which are enough to 1) offer required information to SQL parser 2) locate the right implementation in the user jar.
   
   
   The reason we still use `Method`, is because that current UDF implementation asks for  a `Method`:  [1] [2]. So as we will need to construct a `Method` eventually, it doesn't seem a bad choices to expose it in the interface.   
   
   
   [1]: https://github.com/apache/beam/blob/5076255fda1700f0d3ac2a9e5e73372bdf3c59dd/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L96
   [2]: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L111




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r516265825



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+public class BeamJavaUdfCalcRule extends BeamZetaSqlCalcRuleBase {

Review comment:
       Sure I can handle this. Can you add a TODO so I can add javadoc in another PR.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamJavaUdfCalcRule.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.Convention;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+
+public class BeamJavaUdfCalcRule extends BeamZetaSqlCalcRuleBase {

Review comment:
       Sure I can handle this. Can you add a TODO so I can add javadoc in another PR? I will need to write PRs to polish code.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRuleBase.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTrait;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+public abstract class BeamZetaSqlCalcRuleBase extends ConverterRule {
+
+  public BeamZetaSqlCalcRuleBase(
+      Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String description) {
+    super(clazz, in, out, description);
+  }
+
+  protected boolean hasUdfInProjects(RelOptRuleCall x) {

Review comment:
       We can change this to a static function now.
   
   The idea was to use inheritance to build a bunch of rules to split Calc thus we can separate Java, Python and built-in function execution. As currently current UDF support in this PR is limited, it is not clear why inheritance 

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRuleBase.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.ScalarFunctionImpl;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTrait;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.logical.LogicalCalc;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlUserDefinedFunction;
+
+public abstract class BeamZetaSqlCalcRuleBase extends ConverterRule {
+
+  public BeamZetaSqlCalcRuleBase(
+      Class<? extends RelNode> clazz, RelTrait in, RelTrait out, String description) {
+    super(clazz, in, out, description);
+  }
+
+  protected boolean hasUdfInProjects(RelOptRuleCall x) {

Review comment:
       We can change this to a static function now.
   
   The idea was to use inheritance to build a bunch of rules to split Calc thus we can separate Java, Python and built-in function execution. As currently current UDF support in this PR is limited, it is not clear why inheritance.
   
   When there is a need for inheritance in the future after we start to support mixed built-in function and Java UDF, we can add it back.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
##########
@@ -2197,66 +2197,11 @@ private static void validateTimerFamilyField(
     return DoFnSignature.GetSizeMethod.create(m, windowT, methodContext.getExtraParameters());
   }
 
-  private static Collection<Method> declaredMethodsWithAnnotation(

Review comment:
       Overall idea looks good to me. I will leave @kennknowles  to comment this part as @kennknowles knows more about Java SDK.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513097352



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java
##########
@@ -82,8 +80,6 @@
           // Rules for window functions
           ProjectToWindowRule.PROJECT,
           // Rules so we only have to implement Calc
-          FilterCalcMergeRule.INSTANCE,
-          ProjectCalcMergeRule.INSTANCE,

Review comment:
       I reverted changes to this file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-717608701


   > @ibzib as this PR is lagging behind the master for 200+ commits. Maybe we could try a rebase then check failing precommit tests?
   
   @amaliujia I rebased, but those tests are still failing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib closed pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib closed pull request #13200:
URL: https://github.com/apache/beam/pull/13200


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512922228



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       Commented on the design doc: it is really preferable not to use `Method` here, but a specialized interface that is more specific. `Method` has a bunch of things that are not relevant and also if we want to add UDF-specific methods we can't.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513761419



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       Per our offline chat, we can continue discussing what a good interface should be and this PR should not be blocked. We will mark this interface as experimental and refine when necessary.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513165094



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       @kennknowles Maybe we should have an idea of what smaller interface you are thinking of? Can you give an example?Then we can compare and see pros/cons?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kennknowles commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
kennknowles commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513086568



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I don't agree. If it is a simple function, a method can always be used as in `SomeClass::methodName` and this is also easier for following control flow (including for IDEs). Whenever you can use explicit method references instead of reflection, you should.
   
   The converse is not true. If a function is provided, it cannot automatically be converted to `Method`. The class `Method` is specifically a rich representation of an aspect of a `Class` in the JVM. It is not a good class for use when you want some sort of function. It is not a good class for anything _except_ when you specifically want to manipulate the JVM's representation of a class.
   
   If we expand the interface, we can maintain backwards-compatibility with `default` method implementation. If we need to expand it in a way where this is not possible we will need to create a new interface. If we use `Method` it is not possible to expand the interface anyhow.
   
   I really think if you look at the pro/con of using `Method` vs a smaller interface there is not a single thing in the `pro` column.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-716975441






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r514566869



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT foo();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION fun(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT fun(\"a\", \"a\"), 'apple'='beta'",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField =
+        Schema.builder().addBooleanField("field1").addBooleanField("field2").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues(true, false).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBuiltinAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION agg_fun(str STRING) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT agg_fun(Value) from KeyValue",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testCustomAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION custom_agg(f INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT custom_agg(f_int_1) from aggregate_test_table",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(0L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnregisteredFunction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION notRegistered() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT notRegistered();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage(
+        String.format("No implementation of scalar function notRegistered found in %s.", jarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJarContainsNoUdfProviders() {
+    assumeNotNull(jarPath);
+    assumeNotNull(emptyJarPath);
+    String sql =
+        String.format(
+            // Load an inhabited jar first so we can make sure jars load UdfProviders in isolation
+            // from other jars.
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "CREATE FUNCTION bar() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT bar();",
+            jarPath, emptyJarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(ProviderNotFoundException.class);
+    thrown.expectMessage(String.format("No UdfProvider implementation found in %s.", emptyJarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJavaUdfNoJarProvided() {
+    String sql = "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java; SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No jar was provided to define function foo.");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testPathOptionNotString() {
+    String sql =
+        "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path=23); SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Option 'path' has type TYPE_INT64 (expected TYPE_STRING).");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }

Review comment:
       That makes sense. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-718234442


   @amaliujia Rules are fixed, PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513165094



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       @kennknowles Maybe we should have an idea of what smaller interface you are thinking of (in case we have been thinking different things)? Can you give an example?Then we can compare and see pros/cons?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512950764



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       Using our own interface would give us more flexibility with the implementation, but if our interface was solely a means of looking up a Method, I don't see much of a difference. If we want to add UDF-specific methods, there's no guarantee we could do that in a backward-compatible way even if we used a specialized interface.
   
   (Mostly unrelated fun fact: Intellij can autofill `this.getClass.getMethod` calls, so it's more convenient to use than I initially thought.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521687622



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -644,7 +649,7 @@ private RexNode convertResolvedFunctionCall(
           throw new UnsupportedOperationException(
               "Unsupported function: " + funName + ". Only support TUMBLE, HOP, and SESSION now.");
       }
-    } else if (funGroup.equals("ZetaSQL")) {
+    } else if (funGroup.equals(ZETASQL_FUNCTION_GROUP_NAME)) {

Review comment:
       Thanks, forgot about that. I fixed this part in #13309.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-717396823


   The failed SQL tests seems true.  I will review the code and after the overall idea looks good, we can dig into to see why some tests have failed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513165094



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       @kennknowles Maybe we should have an idea of what smaller interface you are thinking of (in case we have been thinking different things)? Can you give a concrete example, say for a scalar function, what is the implementation and what is returned from the interface?Then we can compare and see pros/cons and how to expand the interface? 
   
   We can go back to design doc to discuss this if you prefer to. Right now I cannot find the original comment you mentioned so starting a new comment is ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512938754



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       I can see why you think `Method` have too many things that might not relevant. Ideally we only need users to provide function signature (including function name) , which are enough to 1) offer required information to SQL parser 2) locate the right implementation in the user jar.
   
   
   The reason we still use `Method`, is because that current UDF implementation asks for  a `Method`:  [1] [2]. So as we will need to construct a `Method` eventually, it doesn't seem a bad choices to expose it in the interface.   
   
   
   [1]: https://github.com/apache/beam/blob/5076255fda1700f0d3ac2a9e5e73372bdf3c59dd/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L96
   [2]: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/ScalarFunctionImpl.java#L111




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513003261



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
##########
@@ -184,7 +190,9 @@ private BeamRelNode convertToBeamRelInternal(String sql, QueryParameters queryPa
     RelMetadataQuery.THREAD_PROVIDERS.set(
         JaninoRelMetadataProvider.of(root.rel.getCluster().getMetadataProvider()));
     root.rel.getCluster().invalidateMetadataQuery();
-    return (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
+    BeamRelNode beamRelNode = (BeamRelNode) plannerImpl.transform(0, desiredTraits, root.rel);
+    LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));

Review comment:
       I see. Maybe just leave this line here. I don't know where in the past we don't have this logging but match with what CalcitePlanner does is ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r512975318



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       Agreed with what @ibzib has said.
   
   My another thought is, having a Method in our interface actually is a good thing for longer term evolvement. Once the interface is adopted, it will become harder to deprecate and upgrade this interface. Thus having `Method` provides more flexibility to allow us enhance UDF implementation without changing the interface. The information (e.g. annotations) that might be not relevant now could become relevant later.    




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r514495645



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT foo();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION fun(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT fun(\"a\", \"a\"), 'apple'='beta'",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField =
+        Schema.builder().addBooleanField("field1").addBooleanField("field2").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues(true, false).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBuiltinAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION agg_fun(str STRING) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT agg_fun(Value) from KeyValue",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testCustomAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION custom_agg(f INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT custom_agg(f_int_1) from aggregate_test_table",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(0L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnregisteredFunction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION notRegistered() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT notRegistered();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage(
+        String.format("No implementation of scalar function notRegistered found in %s.", jarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJarContainsNoUdfProviders() {
+    assumeNotNull(jarPath);
+    assumeNotNull(emptyJarPath);
+    String sql =
+        String.format(
+            // Load an inhabited jar first so we can make sure jars load UdfProviders in isolation
+            // from other jars.
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "CREATE FUNCTION bar() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT bar();",
+            jarPath, emptyJarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(ProviderNotFoundException.class);
+    thrown.expectMessage(String.format("No UdfProvider implementation found in %s.", emptyJarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJavaUdfNoJarProvided() {
+    String sql = "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java; SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No jar was provided to define function foo.");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testPathOptionNotString() {
+    String sql =
+        "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path=23); SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Option 'path' has type TYPE_INT64 (expected TYPE_STRING).");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }

Review comment:
       Can you add a case to verify `NULL` handling? E.g. pass a null value to `increment`? It is ok if it does not work, but it will be good to see expected behavior and then document it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521577848



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION matches(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT matches(\"a\", \"a\"), 'apple'='beta'",

Review comment:
       This is a good point. As a follow up work I will check the mixed usage of UDF and non-UDF case and reject it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r513165094



##########
File path: sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/UdfProvider.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.Combine;
+
+/**
+ * Provider for user-defined functions written in Java. Implementations should be annotated with
+ * {@link com.google.auto.service.AutoService}.
+ */
+public interface UdfProvider {
+  /** Maps function names to scalar function implementations. */
+  default Map<String, Method> userDefinedScalarFunctions() {

Review comment:
       @kennknowles Maybe we should have an idea of what smaller interface you are thinking of (in case we have been thinking different things)? Can you give a concrete example, say for a scalar function, what it is the implementation and what is returned from the interface?Then we can compare and see pros/cons? 
   
   We can go back to design doc to discuss this if you prefer to. Right now I cannot find the original comment you mentioned so starting a new comment is ok.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] ibzib commented on pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
ibzib commented on pull request #13200:
URL: https://github.com/apache/beam/pull/13200#issuecomment-725706197


   I am closing this PR. It will be much easier to review and merge each logical piece separately. @apilloud I will address your comments as they come up in the new PRs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r521577848



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION matches(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT matches(\"a\", \"a\"), 'apple'='beta'",

Review comment:
       Yes your are right. This is an expected behavior as this is just an initial PR to achieve "execute Java UDF but might still have bugs". As a follow up work I will check the mixed usage of UDF and non-UDF case and reject it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] amaliujia commented on a change in pull request #13200: [BEAM-10925] Implement Java UDF in ZetaSQL.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #13200:
URL: https://github.com/apache/beam/pull/13200#discussion_r514566869



##########
File path: sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assume.assumeNotNull;
+
+import java.nio.file.ProviderNotFoundException;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>Some tests will be ignored unless the system property <code>beam.sql.udf.test.jarpath</code>
+ * is set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPath = System.getProperty("beam.sql.udf.test.jar_path");
+  private final String emptyJarPath = System.getProperty("beam.sql.udf.test.empty_jar_path");
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT foo();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello world!").build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION fun(str STRING, regStr STRING) RETURNS BOOLEAN LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT fun(\"a\", \"a\"), 'apple'='beta'",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField =
+        Schema.builder().addBooleanField("field1").addBooleanField("field2").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues(true, false).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testBuiltinAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION agg_fun(str STRING) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT agg_fun(Value) from KeyValue",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(2L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testCustomAggregateUdf() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE AGGREGATE FUNCTION custom_agg(f INT64) RETURNS INT64 LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT custom_agg(f_int_1) from aggregate_test_table",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValue(0L).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnregisteredFunction() {
+    assumeNotNull(jarPath);
+    String sql =
+        String.format(
+            "CREATE FUNCTION notRegistered() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT notRegistered();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage(
+        String.format("No implementation of scalar function notRegistered found in %s.", jarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJarContainsNoUdfProviders() {
+    assumeNotNull(jarPath);
+    assumeNotNull(emptyJarPath);
+    String sql =
+        String.format(
+            // Load an inhabited jar first so we can make sure jars load UdfProviders in isolation
+            // from other jars.
+            "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "CREATE FUNCTION bar() RETURNS STRING LANGUAGE java OPTIONS (path='%s'); "
+                + "SELECT bar();",
+            jarPath, emptyJarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(ProviderNotFoundException.class);
+    thrown.expectMessage(String.format("No UdfProvider implementation found in %s.", emptyJarPath));
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testJavaUdfNoJarProvided() {
+    String sql = "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java; SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("No jar was provided to define function foo.");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testPathOptionNotString() {
+    String sql =
+        "CREATE FUNCTION foo() RETURNS STRING LANGUAGE java OPTIONS (path=23); SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Option 'path' has type TYPE_INT64 (expected TYPE_STRING).");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }

Review comment:
       That makes sense. Thanks!
   
   I will also help add checks after this PR is in. For example, check the mixed usage of built-in functions and UDF.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org