You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2021/03/15 18:16:13 UTC

[beam] branch master updated: [BEAM-10943] Add builtin functions in the Calcite planner.

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 23ba50b  [BEAM-10943] Add builtin functions in the Calcite planner.
     new 3a9591f  Merge pull request #14222 from ibzib/BEAM-10943-builtins
23ba50b is described below

commit 23ba50b1d1d6d7d772888416c0fbb5ba32c88de6
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Fri Mar 12 14:19:37 2021 -0800

    [BEAM-10943] Add builtin functions in the Calcite planner.
    
    This prevents the builtin functions, which aren't used in ZetaSQL, from being added to the schema when using the ZetaSQL planner.
---
 .../beam/sdk/extensions/sql/SqlTransform.java      |  1 -
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   | 37 ++--------------------
 .../extensions/sql/impl/CalciteQueryPlanner.java   | 15 +++++++++
 3 files changed, 18 insertions(+), 35 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index e7d49dc..a73cfac 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -143,7 +143,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
     BeamSqlEnvBuilder sqlEnvBuilder = BeamSqlEnv.builder(metaTableProvider);
 
     // TODO: validate duplicate functions.
-    sqlEnvBuilder.autoLoadBuiltinFunctions();
     registerFunctions(sqlEnvBuilder);
 
     // Load automatic table providers before user ones so the user ones will cause a conflict if
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index cbe3224..848135b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -19,13 +19,11 @@ package org.apache.beam.sdk.extensions.sql.impl;
 
 import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkNotNull;
 
-import java.lang.reflect.Method;
 import java.sql.SQLException;
 import java.util.AbstractMap.SimpleEntry;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
@@ -35,7 +33,6 @@ import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
-import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -148,7 +145,6 @@ public class BeamSqlEnv {
     private String currentSchemaName;
     private Map<String, TableProvider> schemaMap;
     private Set<Map.Entry<String, Function>> functionSet;
-    private boolean autoLoadBuiltinFunctions;
     private boolean autoLoadUdfs;
     private PipelineOptions pipelineOptions;
     private Collection<RuleSet> ruleSets;
@@ -161,7 +157,6 @@ public class BeamSqlEnv {
       schemaMap = new HashMap<>();
       functionSet = new HashSet<>();
       autoLoadUdfs = false;
-      autoLoadBuiltinFunctions = false;
       pipelineOptions = null;
       ruleSets = BeamRuleSets.getRuleSets();
     }
@@ -219,12 +214,6 @@ public class BeamSqlEnv {
       return this;
     }
 
-    /** Load Beam SQL built-in functions defined in {@link BeamBuiltinFunctionProvider}. */
-    public BeamSqlEnvBuilder autoLoadBuiltinFunctions() {
-      autoLoadBuiltinFunctions = true;
-      return this;
-    }
-
     public BeamSqlEnvBuilder setQueryPlannerClassName(String name) {
       queryPlannerClassName = name;
       return this;
@@ -247,14 +236,13 @@ public class BeamSqlEnv {
 
       configureSchemas(jdbcConnection);
 
-      loadBeamBuiltinFunctions();
+      QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets);
 
+      // The planner may choose to add its own builtin functions to the schema, so load user-defined
+      // functions second, in case there's a conflict.
       loadUdfs();
-
       addUdfsUdafs(jdbcConnection);
 
-      QueryPlanner planner = instantiatePlanner(jdbcConnection, ruleSets);
-
       return new BeamSqlEnv(jdbcConnection, planner);
     }
 
@@ -275,25 +263,6 @@ public class BeamSqlEnv {
       }
     }
 
-    private void loadBeamBuiltinFunctions() {
-      if (!autoLoadBuiltinFunctions) {
-        return;
-      }
-
-      for (BeamBuiltinFunctionProvider provider :
-          ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
-        loadBuiltinUdf(provider.getBuiltinMethods());
-      }
-    }
-
-    private void loadBuiltinUdf(Map<String, List<Method>> methods) {
-      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
-        for (Method method : entry.getValue()) {
-          functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
-        }
-      }
-    }
-
     private void loadUdfs() {
       if (!autoLoadUdfs) {
         return;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index b24b973..ff5deb4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -17,9 +17,11 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.Factory;
 import org.apache.beam.sdk.extensions.sql.impl.QueryPlanner.QueryParameters.Kind;
@@ -27,6 +29,7 @@ import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
 import org.apache.beam.sdk.extensions.sql.impl.planner.RelMdNodeStats;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.udf.BeamBuiltinFunctionProvider;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.config.CalciteConnectionConfig;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteSchema;
@@ -92,8 +95,20 @@ public class CalciteQueryPlanner implements QueryPlanner {
         @Override
         public QueryPlanner createPlanner(
             JdbcConnection jdbcConnection, Collection<RuleSet> ruleSets) {
+          loadBuiltinFunctions(jdbcConnection);
           return new CalciteQueryPlanner(jdbcConnection, ruleSets);
         }
+
+        private void loadBuiltinFunctions(JdbcConnection jdbcConnection) {
+          for (BeamBuiltinFunctionProvider provider :
+              ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+            for (Map.Entry<String, List<Method>> entry : provider.getBuiltinMethods().entrySet()) {
+              for (Method method : entry.getValue()) {
+                jdbcConnection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method));
+              }
+            }
+          }
+        }
       };
 
   public FrameworkConfig defaultConfig(JdbcConnection connection, Collection<RuleSet> ruleSets) {