You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/05/07 21:38:18 UTC

[beam] branch master updated: [BEAM-5644] make Planner configurable.

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d2851bf  [BEAM-5644] make Planner configurable.
     new c25b34a  Merge pull request #7745 from amaliujia/rw-configurable_planner
d2851bf is described below

commit d2851bff01796ac678a4e8f2b2c971c2dc0a1a13
Author: amaliujia <am...@163.com>
AuthorDate: Tue Feb 5 20:16:06 2019 -0800

    [BEAM-5644] make Planner configurable.
---
 .../apache/beam/sdk/extensions/sql/BeamSqlCli.java |  10 +-
 .../beam/sdk/extensions/sql/SqlTransform.java      |  29 ++-
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   | 254 ++++++++++++++-------
 .../sql/impl/BeamSqlPipelineOptions.java           |  32 +++
 .../sql/impl/BeamSqlPipelineOptionsRegistrar.java  |  33 +++
 ...mQueryPlanner.java => CalciteQueryPlanner.java} |  31 +--
 .../beam/sdk/extensions/sql/impl/QueryPlanner.java |  33 +++
 .../sql/impl/SqlConversionException.java           |  30 +++
 .../sdk/extensions/sql/impl/BeamSqlEnvTest.java    |  26 ++-
 .../extensions/sql/impl/parser/BeamDDLTest.java    |   8 +-
 10 files changed, 365 insertions(+), 121 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 6d2c167..d28ac2f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -39,11 +39,11 @@ public class BeamSqlCli {
 
   public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
     this.metaStore = metaStore;
-    this.env = BeamSqlEnv.withTableProvider(metaStore);
-    if (autoLoadUdfUdaf) {
-      env.loadUdfUdafFromProvider();
-    }
-
+    this.env =
+        BeamSqlEnv.builder()
+            .setInitializeTableProvider(metaStore)
+            .loadUdfUdafFromProvider()
+            .build();
     return this;
   }
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index f5613c9..f9d7700 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -25,8 +25,11 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.BeamSqlEnvBuilder;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -95,19 +98,26 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
 
   @Override
   public PCollection<Row> expand(PInput input) {
-    BeamSqlEnv sqlEnv = BeamSqlEnv.readOnly(PCOLLECTION_NAME, toTableMap(input));
-    tableProviderMap().forEach(sqlEnv::addSchema);
+    BeamSqlEnvBuilder sqlEnvBuilder =
+        BeamSqlEnv.builder()
+            .setInitializeTableProvider(
+                new ReadOnlyTableProvider(PCOLLECTION_NAME, toTableMap(input)));
+    tableProviderMap().forEach(sqlEnvBuilder::addSchema);
     if (defaultTableProvider() != null) {
-      sqlEnv.setCurrentSchema(defaultTableProvider());
+      sqlEnvBuilder.setCurrentSchema(defaultTableProvider());
     }
 
     // TODO: validate duplicate functions.
-    sqlEnv.loadBeamBuiltinFunctions();
-    registerFunctions(sqlEnv);
+    sqlEnvBuilder.loadBeamBuiltinFunctions();
+    registerFunctions(sqlEnvBuilder);
     if (autoUdfUdafLoad()) {
-      sqlEnv.loadUdfUdafFromProvider();
+      sqlEnvBuilder.loadUdfUdafFromProvider();
     }
 
+    sqlEnvBuilder.setQueryPlannerClassName(
+        input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName());
+
+    BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
     return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString()));
   }
 
@@ -130,11 +140,12 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
     return tables.build();
   }
 
-  private void registerFunctions(BeamSqlEnv sqlEnv) {
+  private void registerFunctions(BeamSqlEnvBuilder sqlEnvBuilder) {
     udfDefinitions()
-        .forEach(udf -> sqlEnv.registerUdf(udf.udfName(), udf.clazz(), udf.methodName()));
+        .forEach(udf -> sqlEnvBuilder.registerUdf(udf.udfName(), udf.clazz(), udf.methodName()));
 
-    udafDefinitions().forEach(udaf -> sqlEnv.registerUdaf(udaf.udafName(), udaf.combineFn()));
+    udafDefinitions()
+        .forEach(udaf -> sqlEnvBuilder.registerUdaf(udaf.udafName(), udaf.combineFn()));
   }
 
   /**
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index e1589bd..df6a2e4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -17,11 +17,16 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.sql.SQLException;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
+import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
@@ -36,10 +41,8 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.schema.Function;
 import org.apache.calcite.sql.SqlExecutableStatement;
-import org.apache.calcite.sql.parser.SqlParseException;
-import org.apache.calcite.tools.RelConversionException;
-import org.apache.calcite.tools.ValidationException;
 
 /**
  * Contains the metadata of tables/UDF functions, and exposes APIs to
@@ -49,11 +52,15 @@ import org.apache.calcite.tools.ValidationException;
 @Experimental
 public class BeamSqlEnv {
   final JdbcConnection connection;
-  final BeamQueryPlanner planner;
+  final QueryPlanner planner;
 
-  private BeamSqlEnv(TableProvider tableProvider) {
-    connection = JdbcDriver.connect(tableProvider);
-    planner = new BeamQueryPlanner(connection);
+  private BeamSqlEnv(JdbcConnection connection, QueryPlanner planner) {
+    this.connection = connection;
+    this.planner = planner;
+  }
+
+  public static BeamSqlEnvBuilder builder() {
+    return new BeamSqlEnvBuilder();
   }
 
   public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
@@ -61,7 +68,7 @@ public class BeamSqlEnv {
   }
 
   public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
-    return new BeamSqlEnv(tableProvider);
+    return builder().setInitializeTableProvider(tableProvider).build();
   }
 
   public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
@@ -73,109 +80,182 @@ public class BeamSqlEnv {
     return withTableProvider(inMemoryMetaStore);
   }
 
-  private void registerBuiltinUdf(Map<String, List<Method>> methods) {
-    for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
-      for (Method method : entry.getValue()) {
-        connection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method));
-      }
-    }
+  public BeamRelNode parseQuery(String query) throws ParseException {
+    return planner.convertToBeamRel(query);
   }
 
-  public void addSchema(String name, TableProvider tableProvider) {
-    connection.setSchema(name, tableProvider);
+  public boolean isDdl(String sqlStatement) throws ParseException {
+    return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
   }
 
-  public void setCurrentSchema(String name) {
-    try {
-      connection.setSchema(name);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
+  public void executeDdl(String sqlStatement) throws ParseException {
+    SqlExecutableStatement ddl = (SqlExecutableStatement) planner.parse(sqlStatement);
+    ddl.execute(getContext());
   }
 
-  /** Register a UDF function which can be used in SQL expression. */
-  public void registerUdf(String functionName, Class<?> clazz, String method) {
-    connection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, method));
+  public CalcitePrepare.Context getContext() {
+    return connection.createPrepareContext();
   }
 
-  /** Register a UDF function which can be used in SQL expression. */
-  public void registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
-    registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+  public Map<String, String> getPipelineOptions() {
+    return connection.getPipelineOptionsMap();
   }
 
-  /**
-   * Register {@link SerializableFunction} as a UDF function which can be used in SQL expression.
-   * Note, {@link SerializableFunction} must have a constructor without arguments.
-   */
-  public void registerUdf(String functionName, SerializableFunction sfn) {
-    registerUdf(functionName, sfn.getClass(), "apply");
+  public String explain(String sqlString) throws ParseException {
+    try {
+      return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
+    } catch (Exception e) {
+      throw new ParseException("Unable to parse statement", e);
+    }
   }
 
-  /**
-   * Register a UDAF function which can be used in GROUP-BY expression. See {@link
-   * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
-   */
-  public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
-    connection.getCurrentSchemaPlus().add(functionName, new UdafImpl(combineFn));
-  }
+  /** BeamSqlEnv's Builder. */
+  public static class BeamSqlEnvBuilder {
+    private String queryPlannerClassName =
+        "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner";
 
-  /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
-  public void loadUdfUdafFromProvider() {
-    ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
-        .forEach(
-            ins -> {
-              ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass));
-              ins.getSerializableFunctionUdfs()
-                  .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
-              ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn));
-            });
-  }
+    private TableProvider initialTableProvider;
+    private String currentSchemaName;
+    private Map<String, TableProvider> schemaMap = new HashMap<>();
+    private Set<Map.Entry<String, Function>> functionSet = new HashSet<>();
 
-  public void loadBeamBuiltinFunctions() {
-    for (BeamBuiltinFunctionProvider provider :
-        ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
-      registerBuiltinUdf(provider.getBuiltinMethods());
+    public BeamSqlEnvBuilder setInitializeTableProvider(TableProvider tableProvider) {
+      initialTableProvider = tableProvider;
+      return this;
     }
-  }
 
-  public BeamRelNode parseQuery(String query) throws ParseException {
-    try {
-      return planner.convertToBeamRel(query);
-    } catch (ValidationException | RelConversionException | SqlParseException e) {
-      throw new ParseException(String.format("Unable to parse query %s", query), e);
+    public BeamSqlEnvBuilder registerBuiltinUdf(Map<String, List<Method>> methods) {
+      for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
+        for (Method method : entry.getValue()) {
+          functionSet.add(new SimpleEntry<>(entry.getKey(), UdfImpl.create(method)));
+        }
+      }
+      return this;
     }
-  }
 
-  public boolean isDdl(String sqlStatement) throws ParseException {
-    try {
-      return planner.parse(sqlStatement) instanceof SqlExecutableStatement;
-    } catch (SqlParseException e) {
-      throw new ParseException("Unable to parse statement", e);
+    public BeamSqlEnvBuilder addSchema(String name, TableProvider tableProvider) {
+      if (schemaMap.containsKey(name)) {
+        throw new RuntimeException("Schema " + name + " is registered twice.");
+      }
+
+      schemaMap.put(name, tableProvider);
+      return this;
     }
-  }
 
-  public void executeDdl(String sqlStatement) throws ParseException {
-    try {
-      SqlExecutableStatement ddl = (SqlExecutableStatement) planner.parse(sqlStatement);
-      ddl.execute(getContext());
-    } catch (SqlParseException e) {
-      throw new ParseException("Unable to parse DDL statement", e);
+    public BeamSqlEnvBuilder setCurrentSchema(String name) {
+      currentSchemaName = name;
+      return this;
     }
-  }
 
-  public CalcitePrepare.Context getContext() {
-    return connection.createPrepareContext();
-  }
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder registerUdf(String functionName, Class<?> clazz, String method) {
+      functionSet.add(new SimpleEntry<>(functionName, UdfImpl.create(clazz, method)));
 
-  public Map<String, String> getPipelineOptions() {
-    return connection.getPipelineOptionsMap();
-  }
+      return this;
+    }
 
-  public String explain(String sqlString) throws ParseException {
-    try {
-      return RelOptUtil.toString(planner.convertToBeamRel(sqlString));
-    } catch (ValidationException | RelConversionException | SqlParseException e) {
-      throw new ParseException("Unable to parse statement", e);
+    /** Register a UDF function which can be used in SQL expression. */
+    public BeamSqlEnvBuilder registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
+      return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
+    }
+
+    public BeamSqlEnvBuilder registerUdf(String functionName, SerializableFunction sfn) {
+      return registerUdf(functionName, sfn.getClass(), "apply");
+    }
+
+    /**
+     * Register a UDAF function which can be used in GROUP-BY expression. See {@link
+     * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
+     */
+    public BeamSqlEnvBuilder registerUdaf(String functionName, Combine.CombineFn combineFn) {
+      functionSet.add(new SimpleEntry<>(functionName, new UdafImpl(combineFn)));
+      return this;
+    }
+
+    /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
+    public BeamSqlEnvBuilder loadUdfUdafFromProvider() {
+      ServiceLoader.<UdfUdafProvider>load(UdfUdafProvider.class)
+          .forEach(
+              ins -> {
+                ins.getBeamSqlUdfs().forEach((udfName, udfClass) -> registerUdf(udfName, udfClass));
+                ins.getSerializableFunctionUdfs()
+                    .forEach((udfName, udfFn) -> registerUdf(udfName, udfFn));
+                ins.getUdafs().forEach((udafName, udafFn) -> registerUdaf(udafName, udafFn));
+              });
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder loadBeamBuiltinFunctions() {
+      for (BeamBuiltinFunctionProvider provider :
+          ServiceLoader.load(BeamBuiltinFunctionProvider.class)) {
+        registerBuiltinUdf(provider.getBuiltinMethods());
+      }
+
+      return this;
+    }
+
+    public BeamSqlEnvBuilder setQueryPlannerClassName(String name) {
+      queryPlannerClassName = name;
+      return this;
+    }
+
+    /**
+     * Build function to create an instance of BeamSqlEnv based on preset fields.
+     *
+     * @return BeamSqlEnv.
+     */
+    public BeamSqlEnv build() {
+      // This check is to retain backward compatible because most of BeamSqlEnv are initialized by
+      // withTableProvider API.
+      if (initialTableProvider == null) {
+        throw new RuntimeException("initialTableProvider must be set in BeamSqlEnvBuilder.");
+      }
+
+      JdbcConnection jdbcConnection = JdbcDriver.connect(initialTableProvider);
+
+      // set schema
+      for (Map.Entry<String, TableProvider> schemaEntry : schemaMap.entrySet()) {
+        jdbcConnection.setSchema(schemaEntry.getKey(), schemaEntry.getValue());
+      }
+
+      // reset default schema
+      if (currentSchemaName != null) {
+        try {
+          jdbcConnection.setSchema(currentSchemaName);
+        } catch (SQLException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      // add UDF
+      for (Map.Entry<String, Function> functionEntry : functionSet) {
+        jdbcConnection.getCurrentSchemaPlus().add(functionEntry.getKey(), functionEntry.getValue());
+      }
+
+      QueryPlanner planner;
+
+      if (queryPlannerClassName.equals(
+          "org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner")) {
+        planner = new CalciteQueryPlanner(jdbcConnection);
+      } else {
+        try {
+          planner =
+              (QueryPlanner)
+                  Class.forName(queryPlannerClassName)
+                      .getConstructor(JdbcConnection.class)
+                      .newInstance(jdbcConnection);
+        } catch (NoSuchMethodException
+            | ClassNotFoundException
+            | InstantiationException
+            | IllegalAccessException
+            | InvocationTargetException e) {
+          throw new RuntimeException(
+              String.format("Cannot construct query planner %s", queryPlannerClassName), e);
+        }
+      }
+
+      return new BeamSqlEnv(jdbcConnection, planner);
     }
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
new file mode 100644
index 0000000..58d02a1
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptions.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/** Options used to configure BeamSQL. */
+public interface BeamSqlPipelineOptions extends PipelineOptions {
+
+  @Description("QueryPlanner class name.")
+  @Default.String("org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner")
+  String getPlannerName();
+
+  void setPlannerName(String className);
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
new file mode 100644
index 0000000..4f5cc37
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlPipelineOptionsRegistrar.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.auto.service.AutoService;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+
+/** {@link AutoService} registrar for {@link BeamSqlPipelineOptions}. */
+@AutoService(PipelineOptionsRegistrar.class)
+public class BeamSqlPipelineOptionsRegistrar implements PipelineOptionsRegistrar {
+
+  @Override
+  public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+    return ImmutableList.of(BeamSqlPipelineOptions.class);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
similarity index 84%
rename from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
rename to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
index 8399360..beab059 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteQueryPlanner.java
@@ -51,16 +51,16 @@ import org.slf4j.LoggerFactory;
  * The core component to handle through a SQL statement, from explain execution plan, to generate a
  * Beam pipeline.
  */
-class BeamQueryPlanner {
-  private static final Logger LOG = LoggerFactory.getLogger(BeamQueryPlanner.class);
+class CalciteQueryPlanner implements QueryPlanner {
+  private static final Logger LOG = LoggerFactory.getLogger(CalciteQueryPlanner.class);
 
-  private JdbcConnection connection;
+  private final Planner planner;
 
-  BeamQueryPlanner(JdbcConnection connection) {
-    this.connection = connection;
+  CalciteQueryPlanner(JdbcConnection connection) {
+    planner = Frameworks.getPlanner(defaultConfig(connection));
   }
 
-  public FrameworkConfig config() {
+  public FrameworkConfig defaultConfig(JdbcConnection connection) {
     final CalciteConnectionConfig config = connection.config();
     final SqlParser.ConfigBuilder parserConfig =
         SqlParser.configBuilder()
@@ -102,11 +102,13 @@ class BeamQueryPlanner {
   }
 
   /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
-  public SqlNode parse(String sqlStatement) throws SqlParseException {
-    Planner planner = getPlanner();
+  @Override
+  public SqlNode parse(String sqlStatement) throws ParseException {
     SqlNode parsed;
     try {
       parsed = planner.parse(sqlStatement);
+    } catch (SqlParseException e) {
+      throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
     } finally {
       planner.close();
     }
@@ -114,10 +116,10 @@ class BeamQueryPlanner {
   }
 
   /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */
+  @Override
   public BeamRelNode convertToBeamRel(String sqlStatement)
-      throws ValidationException, RelConversionException, SqlParseException, CannotPlanException {
+      throws ParseException, SqlConversionException {
     BeamRelNode beamRelNode;
-    Planner planner = getPlanner();
     try {
       SqlNode parsed = planner.parse(sqlStatement);
       SqlNode validated = planner.validate(parsed);
@@ -137,13 +139,14 @@ class BeamQueryPlanner {
       // beam physical plan
       beamRelNode = (BeamRelNode) planner.transform(0, desiredTraits, root.rel);
       LOG.info("BEAMPlan>\n" + RelOptUtil.toString(beamRelNode));
+    } catch (RelConversionException | CannotPlanException e) {
+      throw new SqlConversionException(
+          String.format("Unable to convert query %s", sqlStatement), e);
+    } catch (SqlParseException | ValidationException e) {
+      throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);
     } finally {
       planner.close();
     }
     return beamRelNode;
   }
-
-  private Planner getPlanner() {
-    return Frameworks.getPlanner(config());
-  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
new file mode 100644
index 0000000..0593921
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/QueryPlanner.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.calcite.sql.SqlNode;
+
+/**
+ * An interface that planners should implement to convert sql statement to {@link BeamRelNode} or
+ * {@link SqlNode}.
+ */
+public interface QueryPlanner {
+  /** It parses and validate the input query, then convert into a {@link BeamRelNode} tree. */
+  BeamRelNode convertToBeamRel(String sqlStatement) throws ParseException, SqlConversionException;
+
+  /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */
+  SqlNode parse(String sqlStatement) throws ParseException;
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/SqlConversionException.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/SqlConversionException.java
new file mode 100644
index 0000000..f24801f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/SqlConversionException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+/** Exception thrown when BeamSQL cannot convert sql to BeamRelNode. */
+public class SqlConversionException extends RuntimeException {
+
+  public SqlConversionException(Throwable cause) {
+    super(cause);
+  }
+
+  public SqlConversionException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index b812e4c..578a4aa 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -17,15 +17,20 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import static org.hamcrest.Matchers.containsString;
 import static org.junit.Assert.assertEquals;
+import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 
 import java.sql.Connection;
 import java.sql.ResultSet;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /** Tests for {@link BeamSqlEnv}. */
 public class BeamSqlEnvTest {
+  @Rule public ExpectedException exceptions = ExpectedException.none();
 
   @Test
   public void testCreateExternalTableInNestedTableProvider() throws Exception {
@@ -33,9 +38,12 @@ public class BeamSqlEnvTest {
     TestTableProvider nested = new TestTableProvider();
     TestTableProvider anotherOne = new TestTableProvider();
 
-    BeamSqlEnv env = BeamSqlEnv.withTableProvider(root);
-    env.addSchema("nested", nested);
-    env.addSchema("anotherOne", anotherOne);
+    BeamSqlEnv env =
+        BeamSqlEnv.builder()
+            .setInitializeTableProvider(root)
+            .addSchema("nested", nested)
+            .addSchema("anotherOne", anotherOne)
+            .build();
 
     Connection connection = env.connection;
     connection.createStatement().execute("CREATE EXTERNAL TABLE nested.person (id INT) TYPE test");
@@ -46,4 +54,16 @@ public class BeamSqlEnvTest {
 
     assertEquals(9, rs.getInt(1));
   }
+
+  @Test
+  public void testPlannerClassNotFound() {
+    exceptions.expect(RuntimeException.class);
+    exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
+
+    TestTableProvider root = new TestTableProvider();
+    BeamSqlEnv.builder()
+        .setInitializeTableProvider(root)
+        .setQueryPlannerClassName("org.test.ClassNotFound")
+        .build();
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 53e05f7..74e6282 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -167,9 +167,11 @@ public class BeamDDLTest {
     TestTableProvider rootProvider = new TestTableProvider();
     TestTableProvider testProvider = new TestTableProvider();
 
-    BeamSqlEnv env = BeamSqlEnv.withTableProvider(rootProvider);
-    env.addSchema("test", testProvider);
-
+    BeamSqlEnv env =
+        BeamSqlEnv.builder()
+            .setInitializeTableProvider(rootProvider)
+            .addSchema("test", testProvider)
+            .build();
     assertNull(testProvider.getTables().get("person"));
     env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");