You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/05/08 03:11:16 UTC
[beam] branch master updated: [SQL] Refactor BeamSqlEnv
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8135e39 [SQL] Refactor BeamSqlEnv
new 79a4637 Merge pull request #8523 from akedin/refactor-sqlenv
8135e39 is described below
commit 8135e39952a117152f8709daa27a9fabb66567ad
Author: akedin <ke...@google.com>
AuthorDate: Tue May 7 15:19:02 2019 -0700
[SQL] Refactor BeamSqlEnv
---
.../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 10 +-
.../beam/sdk/extensions/sql/SqlTransform.java | 18 +-
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 207 ++++++++++++---------
.../sdk/extensions/sql/impl/BeamSqlEnvTest.java | 8 +-
.../extensions/sql/impl/parser/BeamDDLTest.java | 6 +-
5 files changed, 136 insertions(+), 113 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index d28ac2f..5e44c6c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -39,11 +39,11 @@ public class BeamSqlCli {
public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
this.metaStore = metaStore;
- this.env =
- BeamSqlEnv.builder()
- .setInitializeTableProvider(metaStore)
- .loadUdfUdafFromProvider()
- .build();
+ BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
+ if (autoLoadUdfUdaf) {
+ builder.autoLoadUserDefinedFunctions();
+ }
+ this.env = builder.build();
return this;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index f9d7700..e45daca 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -64,7 +64,7 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap
* PCollection<Row> outputTableA = inputTableA.apply(
* SqlTransform
* .query(sql1)
- * .registerUdf("MY_FUNC", MY_FUNC.class, "FUNC");
+ * .addUdf("MY_FUNC", MY_FUNC.class, "FUNC");
*
* //run a JOIN with one table from TextIO, and one table from another query
* PCollection<Row> outputTableB =
@@ -99,19 +99,20 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
@Override
public PCollection<Row> expand(PInput input) {
BeamSqlEnvBuilder sqlEnvBuilder =
- BeamSqlEnv.builder()
- .setInitializeTableProvider(
- new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input)));
+ BeamSqlEnv.builder(new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input)));
+
tableProviderMap().forEach(sqlEnvBuilder::addSchema);
+
if (defaultTableProvider() != null) {
sqlEnvBuilder.setCurrentSchema(defaultTableProvider());
}
// TODO: validate duplicate functions.
- sqlEnvBuilder.loadBeamBuiltinFunctions();
+ sqlEnvBuilder.autoLoadBuiltinFunctions();
registerFunctions(sqlEnvBuilder);
+
if (autoUdfUdafLoad()) {
- sqlEnvBuilder.loadUdfUdafFromProvider();
+ sqlEnvBuilder.autoLoadUserDefinedFunctions();
}
sqlEnvBuilder.setQueryPlannerClassName(
@@ -142,10 +143,9 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
private void registerFunctions(BeamSqlEnvBuilder sqlEnvBuilder) {
udfDefinitions()
- .forEach(udf -> sqlEnvBuilder.registerUdf(udf.udfName(), udf.clazz(), udf.methodName()));
+ .forEach(udf -> sqlEnvBuilder.addUdf(udf.udfName(), udf.clazz(), udf.methodName()));
- udafDefinitions()
- .forEach(udaf -> sqlEnvBuilder.registerUdaf(udaf.udafName(), udaf.combineFn()));
+ udafDefinitions().forEach(udaf -> sqlEnvBuilder.addUdaf(udaf.udafName(), udaf.combineFn()));
}
/**
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index df6a2e4..02b3e69 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,7 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
-import java.lang.reflect.InvocationTargetException;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
+
import java.lang.reflect.Method;
import java.sql.SQLException;
import java.util.AbstractMap.SimpleEntry;
@@ -37,8 +38,9 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
-import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.schema.Function;
@@ -51,16 +53,17 @@ import org.apache.calcite.sql.SqlExecutableStatement;
@Internal
@Experimental
public class BeamSqlEnv {
- final JdbcConnection connection;
- final QueryPlanner planner;
+ JdbcConnection connection;
+ QueryPlanner planner;
private BeamSqlEnv(JdbcConnection connection, QueryPlanner planner) {
this.connection = connection;
this.planner = planner;
}
- public static BeamSqlEnvBuilder builder() {
- return new BeamSqlEnvBuilder();
+ /** Creates a builder with the default schema backed by the table provider. */
+ public static BeamSqlEnvBuilder builder(TableProvider tableProvider) {
+ return new BeamSqlEnvBuilder(tableProvider);
}
public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
@@ -68,7 +71,7 @@ public class BeamSqlEnv {
}
public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
- return builder().setInitializeTableProvider(tableProvider).build();
+ return builder(tableProvider).build();
}
public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
@@ -111,28 +114,28 @@ public class BeamSqlEnv {
/** BeamSqlEnv's Builder. */
public static class BeamSqlEnvBuilder {
- private String queryPlannerClassName =
+ private static final String CALCITE_PLANNER =
"org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
-
- private TableProvider initialTableProvider;
+ private String queryPlannerClassName;
+ private TableProvider defaultTableProvider;
private String currentSchemaName;
- private Map<String, TableProvider> schemaMap = new HashMap<>();
- private Set<Map.Entry<String, Function>> functionSet = new HashSet<>();
-
- public BeamSqlEnvBuilder setInitializeTableProvider(TableProvider tableProvider) {
- initialTableProvider = tableProvider;
- return this;
- }
-
- public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> methods) {
- for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
- for (Method method : entry.getValue()) {
- functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
- }
- }
- return this;
+ private Map<String, TableProvider> schemaMap;
+ private Set<Map.Entry<String, Function>> functionSet;
+ private boolean autoLoadBuiltinFunctions;
+ private boolean autoLoadUdfs;
+
+ private BeamSqlEnvBuilder(TableProvider tableProvider) {
+ checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
+
+ defaultTableProvider = tableProvider;
+ queryPlannerClassName = CALCITE_PLANNER;
+ schemaMap = new HashMap<>();
+ functionSet = new HashSet<>();
+ autoLoadUdfs = false;
+ autoLoadBuiltinFunctions = false;
}
+ /** Add a top-level schema backed by the table provider. */
public BeamSqlEnvBuilder addSchema(String name, TableProvider tableProvider) {
if (schemaMap.containsKey(name)) {
throw new RuntimeException("Schema " + name + " is registered twice.");
@@ -142,56 +145,47 @@ public class BeamSqlEnv {
return this;
}
+ /** Set the current (default) schema. */
public BeamSqlEnvBuilder setCurrentSchema(String name) {
currentSchemaName = name;
return this;
}
/** Register a UDF function which can be used in SQL expression. */
- public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, String method) {
+ public BeamSqlEnvBuilder addUdf(String functionName, Class<?> clazz, String method) {
functionSet.add(new SimpleEntry<>(functionName, UdfImpl.create(clazz, method)));
-
return this;
}
/** Register a UDF function which can be used in SQL expression. */
- public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
- return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+ public BeamSqlEnvBuilder addUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+ return addUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
}
- public BeamSqlEnvBuilder registerUdf(String functionName, SerializableFunction sfn) {
- return registerUdf(functionName, sfn.getClass(), "apply");
+ /** Register a UDF function which can be used in SQL expression. */
+ public BeamSqlEnvBuilder addUdf(String functionName, SerializableFunction sfn) {
+ return addUdf(functionName, sfn.getClass(), "apply");
}
/**
- * Register a UDAF function which can be used in GROUP-BY expression. See {@link
- * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
+ * Register a UDAF function which can be used in GROUP-BY expression.
+ *
+ * <p>See {@link CombineFn} on how to implement a UDAF.
*/
- public BeamSqlEnvBuilder registerUdaf(String functionName, Combine.CombineFn combineFn) {
+ public BeamSqlEnvBuilder addUdaf(String functionName, CombineFn combineFn) {
functionSet.add(new SimpleEntry<>(functionName, new UdafImpl(combineFn)));
return this;
}
- /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
- public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
- ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
- .forEach(
- ins -> {
- ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass));
- ins.getSerializableFunctionUdfs()
- .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
- ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn));
- });
-
+ /** Load UDF/UDAFs from {@link UdfUdafProvider}. */
+ public BeamSqlEnvBuilder autoLoadUserDefinedFunctions() {
+ autoLoadUdfs = true;
return this;
}
- public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
- for (BeamBuiltinFunctionProvider provider :
- ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
- registerBuiltinUdf(provider.getBuiltinMethods());
- }
-
+ /** Load Beam SQL built-in functions defined in {@link BeamBuiltinFunctionProvider}. */
+ public BeamSqlEnvBuilder autoLoadBuiltinFunctions() {
+ autoLoadBuiltinFunctions = true;
return this;
}
@@ -206,56 +200,93 @@ public class BeamSqlEnv {
* @return BeamSqlEnv.
*/
public BeamSqlEnv build() {
- // This check is to retain backward compatible because most of BeamSqlEnv are initialized by
- // withTableProvider API.
- if (initialTableProvider == null) {
- throw new RuntimeException("initialTableProvider must be set in BeamSqlEnvBuilder.");
+
+ JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);
+
+ configureSchemas(jdbcConnection);
+
+ loadBeamBuiltinFunctions();
+
+ loadUdfs();
+
+ addUdfsUdafs(jdbcConnection);
+
+ QueryPlanner planner = instantiatePlanner(jdbcConnection);
+
+ return new BeamSqlEnv(jdbcConnection, planner);
+ }
+
+ private void configureSchemas(JdbcConnection jdbcConnection) {
+ // SetSchema adds the schema with the specified name
+ // backed by the table provider.
+ // Does not update the current default schema.
+ schemaMap.forEach(jdbcConnection::setSchema);
+
+ if (Strings.isNullOrEmpty(currentSchemaName)) {
+ return;
+ }
+
+ try {
+ jdbcConnection.setSchema(currentSchemaName);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
}
+ }
- JdbcConnection jdbcConnection = JdbcDriver.connect(initialTableProvider);
+ private void loadBeamBuiltinFunctions() {
+ if (!autoLoadBuiltinFunctions) {
+ return;
+ }
- // set schema
- for (Map.Entry<String, TableProvider> schemaEntry : schemaMap.entrySet()) {
- jdbcConnection.setSchema(schemaEntry.getKey(), schemaEntry.getValue());
+ for (BeamBuiltinFunctionProvider provider :
+ ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+ loadBuiltinUdf(provider.getBuiltinMethods());
}
+ }
- // reset default schema
- if (currentSchemaName != null) {
- try {
- jdbcConnection.setSchema(currentSchemaName);
- } catch (SQLException e) {
- throw new RuntimeException(e);
+ private void loadBuiltinUdf(Map<String, List<Method>> methods) {
+ for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+ for (Method method : entry.getValue()) {
+ functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
}
}
+ }
+
+ private void loadUdfs() {
+ if (!autoLoadUdfs) {
+ return;
+ }
+
+ ServiceLoader.load(UdfUdafProvider.class)
+ .forEach(
+ ins -> {
+ ins.getBeamSqlUdfs().forEach(this::addUdf);
+ ins.getSerializableFunctionUdfs().forEach(this::addUdf);
+ ins.getUdafs().forEach(this::addUdaf);
+ });
+ }
- // add UDF
+ private void addUdfsUdafs(JdbcConnection connection) {
for (Map.Entry<String, Function> functionEntry : functionSet) {
- jdbcConnection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());
+ connection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());
}
+ }
- QueryPlanner planner;
-
- if (queryPlannerClassName.equals(
- "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner")) {
- planner = new CalciteQueryPlanner(jdbcConnection);
- } else {
- try {
- planner =
- (QueryPlanner)
- Class.forName(queryPlannerClassName)
- .getConstructor(JdbcConnection.class)
- .newInstance(jdbcConnection);
- } catch (NoSuchMethodException
- | ClassNotFoundException
- | InstantiationException
- | IllegalAccessException
- | InvocationTargetException e) {
- throw new RuntimeException(
- String.format("Cannot construct query planner %s", queryPlannerClassName), e);
- }
+ private QueryPlanner instantiatePlanner(JdbcConnection jdbcConnection) {
+
+ if (queryPlannerClassName.equals(CALCITE_PLANNER)) {
+ return new CalciteQueryPlanner(jdbcConnection);
}
- return new BeamSqlEnv(jdbcConnection, planner);
+ try {
+ return (QueryPlanner)
+ Class.forName(queryPlannerClassName)
+ .getConstructor(JdbcConnection.class)
+ .newInstance(jdbcConnection);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Cannot construct query planner %s", queryPlannerClassName), e);
+ }
}
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index 578a4aa..517309d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -39,8 +39,7 @@ public class BeamSqlEnvTest {
TestTableProvider anotherOne = new TestTableProvider();
BeamSqlEnv env =
- BeamSqlEnv.builder()
- .setInitializeTableProvider(root)
+ BeamSqlEnv.builder(root)
.addSchema("nested", nested)
.addSchema("anotherOne", anotherOne)
.build();
@@ -61,9 +60,6 @@ public class BeamSqlEnvTest {
exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
TestTableProvider root = new TestTableProvider();
- BeamSqlEnv.builder()
- .setInitializeTableProvider(root)
- .setQueryPlannerClassName("org.test.ClassNotFound")
- .build();
+ BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build();
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 74e6282..b7f4215 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -167,11 +167,7 @@ public class BeamDDLTest {
TestTableProvider rootProvider = new TestTableProvider();
TestTableProvider testProvider = new TestTableProvider();
- BeamSqlEnv env =
- BeamSqlEnv.builder()
- .setInitializeTableProvider(rootProvider)
- .addSchema("test", testProvider)
- .build();
+ BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build();
assertNull(testProvider.getTables().get("person"));
env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");