You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/05/08 03:11:16 UTC

[beam] branch master updated: [SQL] Refactor BeamSqlEnv

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8135e39  [SQL] Refactor BeamSqlEnv
     new 79a4637  Merge pull request #8523 from akedin/refactor-sqlenv
8135e39 is described below

commit 8135e39952a117152f8709daa27a9fabb66567ad
Author: akedin <ke...@google.com>
AuthorDate: Tue May 7 15:19:02 2019 -0700

    [SQL] Refactor BeamSqlEnv
---
 .../apache/beam/sdk/extensions/sql/BeamSqlCli.java |  10 +-
 .../beam/sdk/extensions/sql/SqlTransform.java      |  18 +-
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   | 207 ++++++++++++---------
 .../sdk/extensions/sql/impl/BeamSqlEnvTest.java    |   8 +-
 .../extensions/sql/impl/parser/BeamDDLTest.java    |   6 +-
 5 files changed, 136 insertions(+), 113 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index d28ac2f..5e44c6c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -39,11 +39,11 @@ public class BeamSqlCli {
 
   public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
     this.metaStore = metaStore;
-    this.env =
-        BeamSqlEnv.builder()
-            .setInitializeTableProvider(metaStore)
-            .loadUdfUdafFromProvider()
-            .build();
+    BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
+    if (autoLoadUdfUdaf) {
+      builder.autoLoadUserDefinedFunctions();
+    }
+    this.env = builder.build();
     return this;
   }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index f9d7700..e45daca 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -64,7 +64,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap
  * PCollection<Row> outputTableA = inputTableA.apply(
  *    SqlTransform
  *        .query(sql1)
- *        .registerUdf("MY_FUNC", MY_FUNC.class, "FUNC");
+ *        .addUdf("MY_FUNC", MY_FUNC.class, "FUNC");
  *
  * //run a JOIN with one table from TextIO, and one table from another query
  * PCollection<Row> outputTableB =
@@ -99,19 +99,20 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
   @Override
   public PCollection<Row> expand(PInput input) {
     BeamSqlEnvBuilder sqlEnvBuilder =
-        BeamSqlEnv.builder()
-            .setInitializeTableProvider(
-                new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input)));
+        BeamSqlEnv.builder(new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input)));
+
     tableProviderMap().forEach(sqlEnvBuilder::addSchema);
+
     if (defaultTableProvider() != null) {
       sqlEnvBuilder.setCurrentSchema(defaultTableProvider());
     }
 
     // TODO: validate duplicate functions.
-    sqlEnvBuilder.loadBeamBuiltinFunctions();
+    sqlEnvBuilder.autoLoadBuiltinFunctions();
     registerFunctions(sqlEnvBuilder);
+
     if (autoUdfUdafLoad()) {
-      sqlEnvBuilder.loadUdfUdafFromProvider();
+      sqlEnvBuilder.autoLoadUserDefinedFunctions();
     }
 
     sqlEnvBuilder.setQueryPlannerClassName(
@@ -142,10 +143,9 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
 
   private void registerFunctions(BeamSqlEnvBuilder sqlEnvBuilder) {
     udfDefinitions()
-        .forEach(udf -> sqlEnvBuilder.registerUdf(udf.udfName(), udf.clazz(), udf.methodName()));
+        .forEach(udf -> sqlEnvBuilder.addUdf(udf.udfName(), udf.clazz(), udf.methodName()));
 
-    udafDefinitions()
-        .forEach(udaf -> sqlEnvBuilder.registerUdaf(udaf.udafName(), udaf.combineFn()));
+    udafDefinitions().forEach(udaf -> sqlEnvBuilder.addUdaf(udaf.udafName(), udaf.combineFn()));
   }
 
   /**
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index df6a2e4..02b3e69 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
-import java.lang.reflect.InvocationTargetException;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
 import java.lang.reflect.Method;
 import java.sql.SQLException;
 import java.util.AbstractMap.SimpleEntry;
@@ -37,8 +38,9 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.schema.Function;
@@ -51,16 +53,17 @@ import org.apache.calcite.sql.SqlExecutableStatement;
 @Internal
 @Experimental
 public class BeamSqlEnv {
-  final JdbcConnection connection;
-  final QueryPlanner planner;
+  JdbcConnection connection;
+  QueryPlanner planner;
 
   private BeamSqlEnv(JdbcConnection connection, QueryPlanner planner) {
     this.connection = connection;
     this.planner = planner;
   }
 
-  public static BeamSqlEnvBuilder builder() {
-    return new BeamSqlEnvBuilder();
+  /** Creates a builder with the default schema backed by the table provider. */
+  public static BeamSqlEnvBuilder builder(TableProvider tableProvider) {
+    return new BeamSqlEnvBuilder(tableProvider);
   }
 
   public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
@@ -68,7 +71,7 @@ public class BeamSqlEnv {
   }
 
   public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
-    return builder().setInitializeTableProvider(tableProvider).build();
+    return builder(tableProvider).build();
   }
 
   public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
@@ -111,28 +114,28 @@ public class BeamSqlEnv {
 
   /** BeamSqlEnv's Builder. */
   public static class BeamSqlEnvBuilder {
-    private String queryPlannerClassName =
+    private static final String CALCITE_PLANNER =
         "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
-
-    private TableProvider initialTableProvider;
+    private String queryPlannerClassName;
+    private TableProvider defaultTableProvider;
     private String currentSchemaName;
-    private Map<String, TableProvider> schemaMap = new HashMap<>();
-    private Set<Map.Entry<String, Function>> functionSet = new HashSet<>();
-
-    public BeamSqlEnvBuilder setInitializeTableProvider(TableProvider tableProvider) {
-      initialTableProvider = tableProvider;
-      return this;
-    }
-
-    public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> methods) {
-      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
-        for (Method method : entry.getValue()) {
-          functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
-        }
-      }
-      return this;
+    private Map<String, TableProvider> schemaMap;
+    private Set<Map.Entry<String, Function>> functionSet;
+    private boolean autoLoadBuiltinFunctions;
+    private boolean autoLoadUdfs;
+
+    private BeamSqlEnvBuilder(TableProvider tableProvider) {
+      checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
+
+      defaultTableProvider = tableProvider;
+      queryPlannerClassName = CALCITE_PLANNER;
+      schemaMap = new HashMap<>();
+      functionSet = new HashSet<>();
+      autoLoadUdfs = false;
+      autoLoadBuiltinFunctions = false;
     }
 
+    /** Add a top-level schema backed by the table provider. */
     public BeamSqlEnvBuilder addSchema(String name, TableProvider tableProvider) {
       if (schemaMap.containsKey(name)) {
         throw new RuntimeException("Schema " + name + " is registered twice.");
@@ -142,56 +145,47 @@ public class BeamSqlEnv {
       return this;
     }
 
+    /** Set the current (default) schema. */
     public BeamSqlEnvBuilder setCurrentSchema(String name) {
       currentSchemaName = name;
       return this;
     }
 
     /** Register a UDF function which can be used in SQL expression. */
-    public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, String method) {
+    public BeamSqlEnvBuilder addUdf(String functionName, Class<?> clazz, String method) {
       functionSet.add(new SimpleEntry<>(functionName, UdfImpl.create(clazz, method)));
-
       return this;
     }
 
     /** Register a UDF function which can be used in SQL expression. */
-    public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
-      return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+    public BeamSqlEnvBuilder addUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+      return addUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
     }
 
-    public BeamSqlEnvBuilder registerUdf(String functionName, SerializableFunction sfn) {
-      return registerUdf(functionName, sfn.getClass(), "apply");
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder addUdf(String functionName, SerializableFunction sfn) {
+      return addUdf(functionName, sfn.getClass(), "apply");
     }
 
     /**
-     * Register a UDAF function which can be used in GROUP-BY expression. See {@link
-     * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
+     * Register a UDAF function which can be used in GROUP-BY expression.
+     *
+     * <p>See {@link CombineFn} on how to implement a UDAF.
      */
-    public BeamSqlEnvBuilder registerUdaf(String functionName, Combine.CombineFn combineFn) {
+    public BeamSqlEnvBuilder addUdaf(String functionName, CombineFn combineFn) {
       functionSet.add(new SimpleEntry<>(functionName, new UdafImpl(combineFn)));
       return this;
     }
 
-    /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
-    public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
-      ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
-          .forEach(
-              ins -> {
-                ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass));
-                ins.getSerializableFunctionUdfs()
-                    .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
-                ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn));
-              });
-
+    /** Load UDF/UDAFs from {@link UdfUdafProvider}. */
+    public BeamSqlEnvBuilder autoLoadUserDefinedFunctions() {
+      autoLoadUdfs = true;
       return this;
     }
 
-    public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
-      for (BeamBuiltinFunctionProvider provider :
-          ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
-        registerBuiltinUdf(provider.getBuiltinMethods());
-      }
-
+    /** Load Beam SQL built-in functions defined in {@link BeamBuiltinFunctionProvider}. */
+    public BeamSqlEnvBuilder autoLoadBuiltinFunctions() {
+      autoLoadBuiltinFunctions = true;
       return this;
     }
 
@@ -206,56 +200,93 @@ public class BeamSqlEnv {
      * @return BeamSqlEnv.
      */
     public BeamSqlEnv build() {
-      // This check is to retain backward compatible because most of BeamSqlEnv are initialized by
-      // withTableProvider API.
-      if (initialTableProvider == null) {
-        throw new RuntimeException("initialTableProvider must be set in BeamSqlEnvBuilder.");
+
+      JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);
+
+      configureSchemas(jdbcConnection);
+
+      loadBeamBuiltinFunctions();
+
+      loadUdfs();
+
+      addUdfsUdafs(jdbcConnection);
+
+      QueryPlanner planner = instantiatePlanner(jdbcConnection);
+
+      return new BeamSqlEnv(jdbcConnection, planner);
+    }
+
+    private void configureSchemas(JdbcConnection jdbcConnection) {
+      // SetSchema adds the schema with the specified name
+      // backed by the table provider.
+      // Does not update the current default schema.
+      schemaMap.forEach(jdbcConnection::setSchema);
+
+      if (Strings.isNullOrEmpty(currentSchemaName)) {
+        return;
+      }
+
+      try {
+        jdbcConnection.setSchema(currentSchemaName);
+      } catch (SQLException e) {
+        throw new RuntimeException(e);
       }
+    }
 
-      JdbcConnection jdbcConnection = JdbcDriver.connect(initialTableProvider);
+    private void loadBeamBuiltinFunctions() {
+      if (!autoLoadBuiltinFunctions) {
+        return;
+      }
 
-      // set schema
-      for (Map.Entry<String, TableProvider> schemaEntry : schemaMap.entrySet()) {
-        jdbcConnection.setSchema(schemaEntry.getKey(), schemaEntry.getValue());
+      for (BeamBuiltinFunctionProvider provider :
+          ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+        loadBuiltinUdf(provider.getBuiltinMethods());
       }
+    }
 
-      // reset default schema
-      if (currentSchemaName != null) {
-        try {
-          jdbcConnection.setSchema(currentSchemaName);
-        } catch (SQLException e) {
-          throw new RuntimeException(e);
+    private void loadBuiltinUdf(Map<String, List<Method>> methods) {
+      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+        for (Method method : entry.getValue()) {
+          functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
         }
       }
+    }
+
+    private void loadUdfs() {
+      if (!autoLoadUdfs) {
+        return;
+      }
+
+      ServiceLoader.load(UdfUdafProvider.class)
+          .forEach(
+              ins -> {
+                ins.getBeamSqlUdfs().forEach(this::addUdf);
+                ins.getSerializableFunctionUdfs().forEach(this::addUdf);
+                ins.getUdafs().forEach(this::addUdaf);
+              });
+    }
 
-      // add UDF
+    private void addUdfsUdafs(JdbcConnection connection) {
       for (Map.Entry<String, Function> functionEntry : functionSet) {
-        jdbcConnection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());
+        connection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());
       }
+    }
 
-      QueryPlanner planner;
-
-      if (queryPlannerClassName.equals(
-          "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner")) {
-        planner = new CalciteQueryPlanner(jdbcConnection);
-      } else {
-        try {
-          planner =
-              (QueryPlanner)
-                  Class.forName(queryPlannerClassName)
-                      .getConstructor(JdbcConnection.class)
-                      .newInstance(jdbcConnection);
-        } catch (NoSuchMethodException
-            | ClassNotFoundException
-            | InstantiationException
-            | IllegalAccessException
-            | InvocationTargetException e) {
-          throw new RuntimeException(
-              String.format("Cannot construct query planner %s", queryPlannerClassName), e);
-        }
+    private QueryPlanner instantiatePlanner(JdbcConnection jdbcConnection) {
+
+      if (queryPlannerClassName.equals(CALCITE_PLANNER)) {
+        return new CalciteQueryPlanner(jdbcConnection);
       }
 
-      return new BeamSqlEnv(jdbcConnection, planner);
+      try {
+        return (QueryPlanner)
+            Class.forName(queryPlannerClassName)
+                .getConstructor(JdbcConnection.class)
+                .newInstance(jdbcConnection);
+      } catch (Exception e) {
+        throw new RuntimeException(
+            String.format("Cannot construct query planner %s", queryPlannerClassName), e);
+      }
     }
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index 578a4aa..517309d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -39,8 +39,7 @@ public class BeamSqlEnvTest {
     TestTableProvider anotherOne = new TestTableProvider();
 
     BeamSqlEnv env =
-        BeamSqlEnv.builder()
-            .setInitializeTableProvider(root)
+        BeamSqlEnv.builder(root)
             .addSchema("nested", nested)
             .addSchema("anotherOne", anotherOne)
             .build();
@@ -61,9 +60,6 @@ public class BeamSqlEnvTest {
     exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
 
     TestTableProvider root = new TestTableProvider();
-    BeamSqlEnv.builder()
-        .setInitializeTableProvider(root)
-        .setQueryPlannerClassName("org.test.ClassNotFound")
-        .build();
+    BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build();
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 74e6282..b7f4215 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -167,11 +167,7 @@ public class BeamDDLTest {
     TestTableProvider rootProvider = new TestTableProvider();
     TestTableProvider testProvider = new TestTableProvider();
 
-    BeamSqlEnv env =
-        BeamSqlEnv.builder()
-            .setInitializeTableProvider(rootProvider)
-            .addSchema("test", testProvider)
-            .build();
+    BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build();
     assertNull(testProvider.getTables().get("person"));
     env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");