You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/05/07 21:38:18 UTC
[beam] branch master updated: [BEAM-5644] make Planner configurable.
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d2851bf [BEAM-5644] make Planner configurable.
new c25b34a Merge pull request #7745 from amaliujia/rw-configurable_planner
d2851bf is described below
commit d2851bff01796ac678a4e8f2b2c971c2dc0a1a13
Author: amaliujia <am...@163.com>
AuthorDate: Tue Feb 5 20:16:06 2019 -0800
[BEAM-5644] make Planner configurable.
---
.../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 10 +-
.../beam/sdk/extensions/sql/SqlTransform.java | 29 ++-
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 254 ++++++++++++++-------
.../sql/impl/BeamSqlPipelineOptions.java | 32 +++
.../sql/impl/BeamSqlPipelineOptionsRegistrar.java | 33 +++
...mQueryPlanner.java => CalciteQueryPlanner.java} | 31 +--
.../beam/sdk/extensions/sql/impl/QueryPlanner.java | 33 +++
.../sql/impl/SqlConversionException.java | 30 +++
.../sdk/extensions/sql/impl/BeamSqlEnvTest.java | 26 ++-
.../extensions/sql/impl/parser/BeamDDLTest.java | 8 +-
10 files changed, 365 insertions(+), 121 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 6d2c167..d28ac2f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -39,11 +39,11 @@ public class BeamSqlCli {
public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
this.metaStore = metaStore;
- this.env = BeamSqlEnv.withTableProvider(metaStore);
- if (autoLoadUdfUdaf) {
- env.loadUdfUdafFromProvider();
- }
-
+ this.env =
+ BeamSqlEnv.builder()
+ .setInitializeTableProvider(metaStore)
+ .loadUdfUdafFromProvider()
+ .build();
return this;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index f5613c9..f9d7700 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -25,8 +25,11 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.BeamSqlEnvBuilder;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.PTransform;
@@ -95,19 +98,26 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
@Override
public PCollection<Row> expand(PInput input) {
- BeamSqlEnv sqlEnv = BeamSqlEnv.readOnly(PCOLLECTION_NAME, toTableMap(input));
- tableProviderMap().forEach(sqlEnv::addSchema);
+ BeamSqlEnvBuilder sqlEnvBuilder =
+ BeamSqlEnv.builder()
+ .setInitializeTableProvider(
+ new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input)));
+ tableProviderMap().forEach(sqlEnvBuilder::addSchema);
if (defaultTableProvider() != null) {
- sqlEnv.setCurrentSchema(defaultTableProvider());
+ sqlEnvBuilder.setCurrentSchema(defaultTableProvider());
}
// TODO: validate duplicate functions.
- sqlEnv.loadBeamBuiltinFunctions();
- registerFunctions(sqlEnv);
+ sqlEnvBuilder.loadBeamBuiltinFunctions();
+ registerFunctions(sqlEnvBuilder);
if (autoUdfUdafLoad()) {
- sqlEnv.loadUdfUdafFromProvider();
+ sqlEnvBuilder.loadUdfUdafFromProvider();
}
+ sqlEnvBuilder.setQueryPlannerClassName(
+ input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName());
+
+ BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString()));
}
@@ -130,11 +140,12 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
return tables.build();
}
- private void registerFunctions(BeamSqlEnv sqlEnv) {
+ private void registerFunctions(BeamSqlEnvBuilder sqlEnvBuilder) {
udfDefinitions()
- .forEach(udf -> sqlEnv.registerUdf(udf.udfName(), udf.clazz(), udf.methodName()));
+ .forEach(udf -> sqlEnvBuilder.registerUdf(udf.udfName(), udf.clazz(), udf.methodName()));
- udafDefinitions().forEach(udaf -> sqlEnv.registerUdaf(udaf.udafName(), udaf.combineFn()));
+ udafDefinitions()
+ .forEach(udaf -> sqlEnvBuilder.registerUdaf(udaf.udafName(), udaf.combineFn()));
}
/**
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index e1589bd..df6a2e4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,11 +17,16 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.sql.SQLException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
+import java.util.Set;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -36,10 +41,8 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.schema.Function;
import org.apache.calcite.sql.SqlExecutableStatement;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
/**
* Contains the metadata of tables/UDF functions, and exposes APIs to
@@ -49,11 +52,15 @@ import org.apache.calcite.tools.ValidationException;
@Experimental
public class BeamSqlEnv {
final JdbcConnection connection;
- final BeamQueryPlanner planner;
+ final QueryPlanner planner;
- private BeamSqlEnv(TableProvider tableProvider) {
- connection = JdbcDriver.connect(tableProvider);
- planner = new BeamQueryPlanner(connection);
+ private BeamSqlEnv(JdbcConnection connection, QueryPlanner planner) {
+ this.connection = connection;
+ this.planner = planner;
+ }
+
+ public static BeamSqlEnvBuilder builder() {
+ return new BeamSqlEnvBuilder();
}
public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
@@ -61,7 +68,7 @@ public class BeamSqlEnv {
}
public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
- return new BeamSqlEnv(tableProvider);
+ return builder().setInitializeTableProvider(tableProvider).build();
}
public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
@@ -73,109 +80,182 @@ public class BeamSqlEnv {
return withTableProvider(inMemoryMetaStore);
}
- private void registerBuiltinUdf(Map<String, List<Method>> methods) {
- for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
- for (Method method : entry.getValue()) {
- connection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method));
- }
- }
+ public BeamRelNode parseQuery(String query) throws ParseException {
+ return planner.convertToBeamRel(query);
}
- public void addSchema(String name, TableProvider tableProvider) {
- connection.setSchema(name, tableProvider);
+ public boolean isDdl(String sqlStatement) throws ParseException {
+ return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
}
- public void setCurrentSchema(String name) {
- try {
- connection.setSchema(name);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
+ public void executeDdl(String sqlStatement) throws ParseException {
+ SqlExecutableStatement ddl = (SqlExecutableStatement) planner.parse(sqlStatement);
+ ddl.execute(getContext());
}
- /** Register a UDF function which can be used in SQL expression. */
- public void registerUdf(String functionName, Class<?> clazz, String method) {
- connection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, method));
+ public CalcitePrepare.Context getContext() {
+ return connection.createPrepareContext();
}
- /** Register a UDF function which can be used in SQL expression. */
- public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
- registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+ public Map<String, String> getPipelineOptions() {
+ return connection.getPipelineOptionsMap();
}
- /**
- * Register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
- * Note, {@link SerializableFunction} must have a constructor without arguments.
- */
- public void registerUdf(String functionName, SerializableFunction sfn) {
- registerUdf(functionName, sfn.getClass(), "apply");
+ public String explain(String sqlString) throws ParseException {
+ try {
+ return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
+ } catch (Exception e) {
+ throw new ParseException("Unable to parse statement", e);
+ }
}
- /**
- * Register a UDAF function which can be used in GROUP-BY expression. See {@link
- * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
- */
- public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
- connection.getCurrentSchemaPlus().add(functionName, new UdafImpl(combineFn));
- }
+ /** BeamSqlEnv's Builder. */
+ public static class BeamSqlEnvBuilder {
+ private String queryPlannerClassName =
+ "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
- /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
- public void loadUdfUdafFromProvider() {
- ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
- .forEach(
- ins -> {
- ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass));
- ins.getSerializableFunctionUdfs()
- .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
- ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn));
- });
- }
+ private TableProvider initialTableProvider;
+ private String currentSchemaName;
+ private Map<String, TableProvider> schemaMap = new HashMap<>();
+ private Set<Map.Entry<String, Function>> functionSet = new HashSet<>();
- public void loadBeamBuiltinFunctions() {
- for (BeamBuiltinFunctionProvider provider :
- ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
- registerBuiltinUdf(provider.getBuiltinMethods());
+ public BeamSqlEnvBuilder setInitializeTableProvider(TableProvider tableProvider) {
+ initialTableProvider = tableProvider;
+ return this;
}
- }
- public BeamRelNode parseQuery(String query) throws ParseException {
- try {
- return planner.convertToBeamRel(query);
- } catch (ValidationException | RelConversionException | SqlParseException e) {
- throw new ParseException(String.format("Unable to parse query %s", query), e);
+ public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> methods) {
+ for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+ for (Method method : entry.getValue()) {
+ functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
+ }
+ }
+ return this;
}
- }
- public boolean isDdl(String sqlStatement) throws ParseException {
- try {
- return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
- } catch (SqlParseException e) {
- throw new ParseException("Unable to parse statement", e);
+ public BeamSqlEnvBuilder addSchema(String name, TableProvider tableProvider) {
+ if (schemaMap.containsKey(name)) {
+ throw new RuntimeException("Schema " + name + " is registered twice.");
+ }
+
+ schemaMap.put(name, tableProvider);
+ return this;
}
- }
- public void executeDdl(String sqlStatement) throws ParseException {
- try {
- SqlExecutableStatement ddl = (SqlExecutableStatement) planner.parse(sqlStatement);
- ddl.execute(getContext());
- } catch (SqlParseException e) {
- throw new ParseException("Unable to parse DDL statement", e);
+ public BeamSqlEnvBuilder setCurrentSchema(String name) {
+ currentSchemaName = name;
+ return this;
}
- }
- public CalcitePrepare.Context getContext() {
- return connection.createPrepareContext();
- }
+ /** Register a UDF function which can be used in SQL expression. */
+ public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, String method) {
+ functionSet.add(new SimpleEntry<>(functionName, UdfImpl.create(clazz, method)));
- public Map<String, String> getPipelineOptions() {
- return connection.getPipelineOptionsMap();
- }
+ return this;
+ }
- public String explain(String sqlString) throws ParseException {
- try {
- return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
- } catch (ValidationException | RelConversionException | SqlParseException e) {
- throw new ParseException("Unable to parse statement", e);
+ /** Register a UDF function which can be used in SQL expression. */
+ public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+ return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+ }
+
+ public BeamSqlEnvBuilder registerUdf(String functionName, SerializableFunction sfn) {
+ return registerUdf(functionName, sfn.getClass(), "apply");
+ }
+
+ /**
+ * Register a UDAF function which can be used in GROUP-BY expression. See {@link
+ * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
+ */
+ public BeamSqlEnvBuilder registerUdaf(String functionName, Combine.CombineFn combineFn) {
+ functionSet.add(new SimpleEntry<>(functionName, new UdafImpl(combineFn)));
+ return this;
+ }
+
+ /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
+ public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
+ ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
+ .forEach(
+ ins -> {
+ ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass));
+ ins.getSerializableFunctionUdfs()
+ .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
+ ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn));
+ });
+
+ return this;
+ }
+
+ public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
+ for (BeamBuiltinFunctionProvider provider :
+ ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+ registerBuiltinUdf(provider.getBuiltinMethods());
+ }
+
+ return this;
+ }
+
+ public BeamSqlEnvBuilder setQueryPlannerClassName(String name) {
+ queryPlannerClassName = name;
+ return this;
+ }
+
+ /**
+ * Build function to create an instance of BeamSqlEnv based on preset fields.
+ *
+ * @return BeamSqlEnv.
+ */
+ public BeamSqlEnv build() {
+ // This check is to retain backward compatible because most of BeamSqlEnv are initialized by
+ // withTableProvider API.
+ if (initialTableProvider == null) {
+ throw new RuntimeException("initialTableProvider must be set in BeamSqlEnvBuilder.");
+ }
+
+ JdbcConnection jdbcConnection = JdbcDriver.connect(initialTableProvider);
+
+ // set schema
+ for (Map.Entry<String, TableProvider> schemaEntry : schemaMap.entrySet()) {
+ jdbcConnection.setSchema(schemaEntry.getKey(), schemaEntry.getValue());
+ }
+
+ // reset default schema
+ if (currentSchemaName != null) {
+ try {
+ jdbcConnection.setSchema(currentSchemaName);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ // add UDF
+ for (Map.Entry<String, Function> functionEntry : functionSet) {
+ jdbcConnection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());
+ }
+
+ QueryPlanner planner;
+
+ if (queryPlannerClassName.equals(
+ "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner")) {
+ planner = new CalciteQueryPlanner(jdbcConnection);
+ } else {
+ try {
+ planner =
+ (QueryPlanner)
+ Class.forName(queryPlannerClassName)
+ .getConstructor(JdbcConnection.class)
+ .newInstance(jdbcConnection);
+ } catch (NoSuchMethodException
+ | ClassNotFoundException
+ | InstantiationException
+ | IllegalAccessException
+ | InvocationTargetException e) {
+ throw new RuntimeException(
+ String.format("Cannot construct query planner %s", queryPlannerClassName), e);
+ }
+ }
+
+ return new BeamSqlEnv(jdbcConnection, planner);
}
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
new file mode 100644
index 0000000..58d02a1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/** Options used to configure BeamSQL. */
+public interface BeamSqlPipelineOptions extends PipelineOptions {
+
+ @Description("QueryPlanner class name.")
+ @Default.String("org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner")
+ String getPlannerName();
+
+ void setPlannerName(String className);
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..4f5cc37
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+
+/** {@link AutoService} registrar for {@link BeamSqlPipelineOptions}. */
+@AutoService(PipelineOptionsRegistrar.class)
+public class BeamSqlPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.of(BeamSqlPipelineOptions.class);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
similarity index 84%
rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index 8399360..beab059 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -51,16 +51,16 @@ import org.slf4j.LoggerFactory;
* The core component to handle through a SQL statement, from explain execution plan, to generate a
* Beam pipeline.
*/
-class BeamQueryPlanner {
- private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
+class CalciteQueryPlanner implements QueryPlanner {
+ private static final Logger LOG = LoggerFactory.getLogger(CalciteQueryPlanner.class);
- private JdbcConnection connection;
+ private final Planner planner;
- BeamQueryPlanner(JdbcConnection connection) {
- this.connection = connection;
+ CalciteQueryPlanner(JdbcConnection connection) {
+ planner = Frameworks.getPlanner(defaultConfig(connection));
}
- public FrameworkConfig config() {
+ public FrameworkConfig defaultConfig(JdbcConnection connection) {
final CalciteConnectionConfig config = connection.config();
final SqlParser.ConfigBuilder parserConfig =
SqlParser.configBuilder()
@@ -102,11 +102,13 @@ class BeamQueryPlanner {
}
/** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
- public SqlNode parse(String sqlStatement) throws SqlParseException {
- Planner planner = getPlanner();
+ @Override
+ public SqlNode parse(String sqlStatement) throws ParseException {
SqlNode parsed;
try {
parsed = planner.parse(sqlStatement);
+ } catch (SqlParseException e) {
+ throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
} finally {
planner.close();
}
@@ -114,10 +116,10 @@ class BeamQueryPlanner {
}
/** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */
+ @Override
public BeamRelNode convertToBeamRel(String sqlStatement)
- throws ValidationException, RelConversionException, SqlParseException, CannotPlanException {
+ throws ParseException, SqlConversionException {
BeamRelNode beamRelNode;
- Planner planner = getPlanner();
try {
SqlNode parsed = planner.parse(sqlStatement);
SqlNode validated = planner.validate(parsed);
@@ -137,13 +139,14 @@ class BeamQueryPlanner {
// beam physical plan
beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel);
LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));
+ } catch (RelConversionException | CannotPlanException e) {
+ throw new SqlConversionException(
+ String.format("Unable to convert query %s", sqlStatement), e);
+ } catch (SqlParseException | ValidationException e) {
+ throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
} finally {
planner.close();
}
return beamRelNode;
}
-
- private Planner getPlanner() {
- return Frameworks.getPlanner(config());
- }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
new file mode 100644
index 0000000..0593921
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.sql.SqlNode;
+
+/**
+ * An interface that planners should implement to convert sql statement to {@link BeamRelNode} or
+ * {@link SqlNode}.
+ */
+public interface QueryPlanner {
+ /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */
+ BeamRelNode convertToBeamRel(String sqlStatement) throws ParseException, SqlConversionException;
+
+ /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
+ SqlNode parse(String sqlStatement) throws ParseException;
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/SqlConversionException.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/SqlConversionException.java
new file mode 100644
index 0000000..f24801f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/SqlConversionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+/** Exception thrown when BeamSQL cannot convert sql to BeamRelNode. */
+public class SqlConversionException extends RuntimeException {
+
+ public SqlConversionException(Throwable cause) {
+ super(cause);
+ }
+
+ public SqlConversionException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index b812e4c..578a4aa 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -17,15 +17,20 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import java.sql.Connection;
import java.sql.ResultSet;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
/** Tests for {@link BeamSqlEnv}. */
public class BeamSqlEnvTest {
+ @Rule public ExpectedException exceptions = ExpectedException.none();
@Test
public void testCreateExternalTableInNestedTableProvider() throws Exception {
@@ -33,9 +38,12 @@ public class BeamSqlEnvTest {
TestTableProvider nested = new TestTableProvider();
TestTableProvider anotherOne = new TestTableProvider();
- BeamSqlEnv env = BeamSqlEnv.withTableProvider(root);
- env.addSchema("nested", nested);
- env.addSchema("anotherOne", anotherOne);
+ BeamSqlEnv env =
+ BeamSqlEnv.builder()
+ .setInitializeTableProvider(root)
+ .addSchema("nested", nested)
+ .addSchema("anotherOne", anotherOne)
+ .build();
Connection connection = env.connection;
connection.createStatement().execute("CREATE EXTERNAL TABLE nested.person (id INT) TYPE test");
@@ -46,4 +54,16 @@ public class BeamSqlEnvTest {
assertEquals(9, rs.getInt(1));
}
+
+ @Test
+ public void testPlannerClassNotFound() {
+ exceptions.expect(RuntimeException.class);
+ exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
+
+ TestTableProvider root = new TestTableProvider();
+ BeamSqlEnv.builder()
+ .setInitializeTableProvider(root)
+ .setQueryPlannerClassName("org.test.ClassNotFound")
+ .build();
+ }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 53e05f7..74e6282 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -167,9 +167,11 @@ public class BeamDDLTest {
TestTableProvider rootProvider = new TestTableProvider();
TestTableProvider testProvider = new TestTableProvider();
- BeamSqlEnv env = BeamSqlEnv.withTableProvider(rootProvider);
- env.addSchema("test", testProvider);
-
+ BeamSqlEnv env =
+ BeamSqlEnv.builder()
+ .setInitializeTableProvider(rootProvider)
+ .addSchema("test", testProvider)
+ .build();
assertNull(testProvider.getTables().get("person"));
env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");