You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/04 00:55:57 UTC

[GitHub] [beam] apilloud commented on a change in pull request #13891: [BEAM-10925] Enable user-defined Java scalar functions in ZetaSQL.

apilloud commented on a change in pull request #13891:
URL: https://github.com/apache/beam/pull/13891#discussion_r569811118



##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in the statement. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {

Review comment:
       This looks unused.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in the statement. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Java SQL aggregate functions are not supported (BEAM-10925).");
+        }
+        return USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Native SQL aggregate functions are not supported (BEAM-9954).");
+        }
+        return USER_DEFINED_FUNCTIONS;
+      case "PY":
+      case "PYTHON":
+      case "JS":
+      case "JAVASCRIPT":
+        throw new UnsupportedOperationException(
+            String.format(
+                "Function %s uses unsupported language %s.",
+                String.join(".", createFunctionStmt.getNamePath()),
+                createFunctionStmt.getLanguage()));
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Function %s uses unrecognized language %s.",
+                String.join(".", createFunctionStmt.getNamePath()),
+                createFunctionStmt.getLanguage()));
+    }
+  }
+
+  private Function createFunction(ResolvedCreateFunctionStmt createFunctionStmt) {

Review comment:
       It seems like you bypassed a few abstraction layers here. Probably `ResolvedCreateFunctionStmt` should add a udf to the `TableProvider` (or an equivalent for UDFs). For an example, see CREATE EXTERNAL TABLE in the calcite dialect: https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java#L136

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
     return tables.build();
   }
 
+  /** Returns the fully qualified name of the function defined in the statement. */
+  static String getFunctionQualifiedName(ResolvedCreateFunctionStmt createFunctionStmt) {
+    return String.format(
+        "%s:%s",
+        getFunctionGroup(createFunctionStmt), String.join(".", createFunctionStmt.getNamePath()));
+  }
+
+  static String getFunctionGroup(ResolvedCreateFunctionStmt createFunctionStmt) {
+    switch (createFunctionStmt.getLanguage().toUpperCase()) {
+      case "JAVA":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Java SQL aggregate functions are not supported (BEAM-10925).");
+        }
+        return USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+      case "SQL":
+        if (createFunctionStmt.getIsAggregate()) {
+          throw new UnsupportedOperationException(
+              "Native SQL aggregate functions are not supported (BEAM-9954).");
+        }
+        return USER_DEFINED_FUNCTIONS;
+      case "PY":

Review comment:
       I'm curious as to where these came from. Is there another engine that supports these constants?

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
         >= parseResumeLocation.getInput().getBytes(UTF_8).length;
   }
 
+  static String getOptionStringValue(
+      ResolvedCreateFunctionStmt createFunctionStmt, String optionName) {
+    for (ResolvedNodes.ResolvedOption option : createFunctionStmt.getOptionList()) {
+      if (option.getName().equals(optionName)) {
+        if (option.getValue().getType().getKind() != TypeKind.TYPE_STRING) {

Review comment:
       `getValue` can return null here. I didn't check the other two, but you can find a copy of the generated `ResolvedNodes.java` in internal code search. https://github.com/google/zetasql/blob/862a192a6da487757e860166a9666120b16773f5/java/com/google/zetasql/resolvedast/ResolvedNodes.java.template#L295

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
         >= parseResumeLocation.getInput().getBytes(UTF_8).length;
   }
 
+  static String getOptionStringValue(
+      ResolvedCreateFunctionStmt createFunctionStmt, String optionName) {
+    for (ResolvedNodes.ResolvedOption option : createFunctionStmt.getOptionList()) {
+      if (option.getName().equals(optionName)) {

Review comment:
       How about `optionName.equals(option.getName())`. That will avoid potential crashes if `getName` returns null.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation parseResumeLocation) {
         >= parseResumeLocation.getInput().getBytes(UTF_8).length;
   }
 
+  static String getOptionStringValue(

Review comment:
       nit: This method appears to be used exactly once in another file. It should go right next to the method that calls it.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -104,14 +106,30 @@ public RelRoot rel(String sql, QueryParameters params) {
     ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
         ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, UserFunctionDefinitions.JavaScalarFunction>
+        javaScalarFunctionBuilder = ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+        String functionGroup = SqlAnalyzer.getFunctionGroup(createFunctionStmt);
+        if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) {

Review comment:
       nit: switch/case is better than if for this pattern if your string isn't null.

##########
File path: sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -104,14 +106,30 @@ public RelRoot rel(String sql, QueryParameters params) {
     ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
         ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, UserFunctionDefinitions.JavaScalarFunction>
+        javaScalarFunctionBuilder = ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = (ResolvedCreateFunctionStmt) statement;
-        udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+        String functionGroup = SqlAnalyzer.getFunctionGroup(createFunctionStmt);
+        if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) {
+          udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+        } else if (SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS.equals(functionGroup)) {
+          String jarPath = getJarPath(createFunctionStmt);
+          ScalarFn scalarFn =
+              javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), jarPath);

Review comment:
       Again on bypassing layers, it seems like all this should be in a `TableProvider` like interface, and be built through a `buildBeamSqlUDF` method called from `BeamCalciteSchema` (see just above the line in this link for the table example):
   https://github.com/apache/beam/blob/68d6c8e6243b1d8f392840273f886276e2a8baff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java#L122




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org