You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 21:40:04 UTC

[beam] Diff for: [GitHub] akedin merged pull request #7488: [SQL] Refactor JdbcDriver

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
index fa5dcb3cfcce..8d0bb9092c4f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
@@ -19,11 +19,11 @@
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
 import org.apache.calcite.linq4j.tree.Expression;
 import org.apache.calcite.rel.type.RelProtoDataType;
 import org.apache.calcite.schema.Function;
@@ -34,12 +34,12 @@
 
 /** Adapter from {@link TableProvider} to {@link Schema}. */
 public class BeamCalciteSchema implements Schema {
-  private final TableProvider tableProvider;
-  private final Map<String, String> pipelineOptions;
+  private JdbcConnection connection;
+  private TableProvider tableProvider;
 
-  public BeamCalciteSchema(TableProvider tableProvider) {
+  BeamCalciteSchema(JdbcConnection jdbcConnection, TableProvider tableProvider) {
+    this.connection = jdbcConnection;
     this.tableProvider = tableProvider;
-    this.pipelineOptions = Maps.newHashMap();
   }
 
   public TableProvider getTableProvider() {
@@ -47,7 +47,23 @@ public TableProvider getTableProvider() {
   }
 
   public Map<String, String> getPipelineOptions() {
-    return pipelineOptions;
+    return connection.getPipelineOptionsMap();
+  }
+
+  public void setPipelineOption(String key, String value) {
+    Map<String, String> options = new HashMap<>(connection.getPipelineOptionsMap());
+    options.put(key, value);
+    connection.setPipelineOptionsMap(options);
+  }
+
+  public void removePipelineOption(String key) {
+    Map<String, String> options = new HashMap<>(connection.getPipelineOptionsMap());
+    options.remove(key);
+    connection.setPipelineOptionsMap(options);
+  }
+
+  public void removeAllPipelineOptions() {
+    connection.setPipelineOptionsMap(Collections.emptyMap());
   }
 
   @Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
index dc07280558b6..80bd980b4b85 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -17,28 +17,186 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Properties;
 import java.util.ServiceLoader;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
 import org.apache.calcite.schema.Schema;
 import org.apache.calcite.schema.SchemaFactory;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Table;
 
-/** Factory that creates a {@link BeamCalciteSchema}. */
-public class BeamCalciteSchemaFactory implements SchemaFactory {
-  public static final BeamCalciteSchemaFactory INSTANCE = new BeamCalciteSchemaFactory();
+/**
+ * Factory classes that Calcite uses to create initial schema for JDBC connection.
+ *
+ * <p>The name of the factory class is passed in the connection properties into the JDBC driver
+ * {@code connect()} method. Calcite then creates the factory using the default constructor. It
+ * doesn't allow to hook into the process to pass in additional parameters (e.g. connection), so we
+ * have to add the indirection layer.
+ *
+ * <p>{@link AllProviders} is the default factory that is used by {@link JdbcDriver} unless an
+ * override is specified. It greedily finds and loads all available table providers. This is used in
+ * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection
+ * properties).
+ *
+ * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid
+ * loading all available table providers.
+ */
+class BeamCalciteSchemaFactory {
+
+  /**
+   * Called by {@link JdbcConnection} when initializing to convert the initial empty schema to
+   * actual {@link BeamCalciteSchema}.
+   */
+  static TableProvider fromInitialEmptySchema(JdbcConnection jdbcConnection) {
+    InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema();
+    return initialEmptySchema.getTableProvider();
+  }
+
+  /**
+   * Loads all table providers using service loader. This is the default configured in {@link
+   * JdbcDriver#connect(String, Properties)}.
+   */
+  public static class AllProviders extends InitialEmptySchema implements SchemaFactory {
+
+    /**
+     * We call this in {@link #fromInitialEmptySchema(JdbcConnection)} to convert the schema created
+     * by Calcite to a configured table provider. At this point we have a connection open and can
+     * use it to configure Beam schemas, e.g. with pipeline options.
+     *
+     * <p><i>Note:</i> this loads ALL available table providers marked with
+     * {@code @AutoService(TableProvider.class)}
+     */
+    @Override
+    public TableProvider getTableProvider() {
+      MetaStore metaStore = new InMemoryMetaStore();
+      for (TableProvider provider :
+          ServiceLoader.load(TableProvider.class, getClass().getClassLoader())) {
+        metaStore.registerProvider(provider);
+      }
+      return metaStore;
+    }
+
+    /** This is what Calcite calls to create an instance of the default top level schema. */
+    @Override
+    public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+      return this;
+    }
+  }
+
+  /**
+   * This is the override to create an empty schema, used in {@link
+   * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is
+   * expected to be replaced by an actual functional schema by the same code that specified this
+   * override in the first place.
+   */
+  public static class Empty extends InitialEmptySchema implements SchemaFactory {
+
+    private static final TableProvider READ_ONLY_TABLE_PROVIDER =
+        new ReadOnlyTableProvider("empty", ImmutableMap.of());
+
+    /**
+     * We call this in {@link #fromInitialEmptySchema(JdbcConnection)} to convert the schema created
+     * by Calcite to a configured table provider. This specific instance is an empty readonly
+     * provider that is supposed to be replaced by the code that specified this empty schema in the
+     * connection properties to the driver.
+     */
+    @Override
+    public TableProvider getTableProvider() {
+      return READ_ONLY_TABLE_PROVIDER;
+    }
+
+    /** This is what Calcite calls to create an instance of the top level schema. */
+    @Override
+    public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+      return this;
+    }
+  }
+
+  /**
+   * Empty schema that {@link CalciteConnection} is initialized with by Calcite using the factories
+   * above. It is not flexible enough to allow us to configure the schema during the initialization.
+   *
+   * <p>This solution with overrides and conversions allows us to handle the initialization
+   * ourselves later after Calcite has created the connection with this empty schema.
+   *
+   * <p>This is a temporary indirection for initial connection initialization, allows us to later
+   * replace this empty schema in the connection with the actual correctly configured schema.
+   */
+  public abstract static class InitialEmptySchema implements Schema {
+
+    public abstract TableProvider getTableProvider();
 
-  private BeamCalciteSchemaFactory() {}
+    @Override
+    public Table getTable(String name) {
+      return illegal();
+    }
+
+    @Override
+    public Set<String> getTableNames() {
+      return illegal();
+    }
+
+    @Override
+    public RelProtoDataType getType(String name) {
+      return illegal();
+    }
+
+    @Override
+    public Set<String> getTypeNames() {
+      return illegal();
+    }
+
+    @Override
+    public Collection<Function> getFunctions(String name) {
+      return illegal();
+    }
+
+    @Override
+    public Set<String> getFunctionNames() {
+      return illegal();
+    }
+
+    @Override
+    public Schema getSubSchema(String name) {
+      return illegal();
+    }
+
+    @Override
+    public Set<String> getSubSchemaNames() {
+      return Collections.emptySet();
+    }
+
+    @Override
+    public Expression getExpression(SchemaPlus parentSchema, String name) {
+      return illegal();
+    }
+
+    @Override
+    public boolean isMutable() {
+      return illegal();
+    }
+
+    @Override
+    public Schema snapshot(SchemaVersion version) {
+      return illegal();
+    }
 
-  @Override
-  public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
-    MetaStore metaStore = new InMemoryMetaStore();
-    for (TableProvider provider :
-        ServiceLoader.load(TableProvider.class, getClass().getClassLoader())) {
-      metaStore.registerProvider(provider);
+    @SuppressWarnings("TypeParameterUnusedInFormals")
+    private static <T> T illegal() {
+      throw new IllegalStateException("Beam JDBC connection has not been initialized");
     }
-    return new BeamCalciteSchema(metaStore);
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
index ed6b48046da7..65b2f1556555 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
@@ -22,7 +22,6 @@
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.Contexts;
 import org.apache.calcite.plan.ConventionTraitDef;
@@ -57,7 +56,7 @@
 
   private final FrameworkConfig config;
 
-  BeamQueryPlanner(CalciteConnection connection) {
+  BeamQueryPlanner(JdbcConnection connection) {
     final CalciteConnectionConfig config = connection.config();
     final SqlParser.ConfigBuilder parserConfig =
         SqlParser.configBuilder()
@@ -73,7 +72,7 @@
     }
 
     final SchemaPlus schema = connection.getRootSchema();
-    final SchemaPlus defaultSchema = JdbcDriver.getDefaultSchema(connection);
+    final SchemaPlus defaultSchema = connection.getCurrentSchemaPlus();
 
     final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE);
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 1760cf78984b..5841efb64f8c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -33,11 +33,8 @@
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.calcite.jdbc.CalciteConnection;
 import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.sql.SqlExecutableStatement;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.tools.RelConversionException;
@@ -50,13 +47,11 @@
 @Internal
 @Experimental
 public class BeamSqlEnv {
-  final CalciteConnection connection;
-  final SchemaPlus defaultSchema;
+  final JdbcConnection connection;
   final BeamQueryPlanner planner;
 
   private BeamSqlEnv(TableProvider tableProvider) {
     connection = JdbcDriver.connect(tableProvider);
-    defaultSchema = JdbcDriver.getDefaultSchema(connection);
     planner = new BeamQueryPlanner(connection);
   }
 
@@ -80,14 +75,14 @@ public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
   private void registerBuiltinUdf(Map<String, List<Method>> methods) {
     for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
       for (Method method : entry.getValue()) {
-        defaultSchema.add(entry.getKey(), UdfImpl.create(method));
+        connection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method));
       }
     }
   }
 
   /** Register a UDF function which can be used in SQL expression. */
   public void registerUdf(String functionName, Class<?> clazz, String method) {
-    defaultSchema.add(functionName, UdfImpl.create(clazz, method));
+    connection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, method));
   }
 
   /** Register a UDF function which can be used in SQL expression. */
@@ -108,7 +103,7 @@ public void registerUdf(String functionName, SerializableFunction sfn) {
    * org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
    */
   public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
-    defaultSchema.add(functionName, new UdafImpl(combineFn));
+    connection.getCurrentSchemaPlus().add(functionName, new UdafImpl(combineFn));
   }
 
   /** Load all UDF/UDAF from {@link UdfUdafProvider}. */
@@ -160,7 +155,7 @@ public void executeDdl(String sqlStatement) throws ParseException {
   }
 
   public Map<String, String> getPipelineOptions() {
-    return ((BeamCalciteSchema) CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
+    return connection.getPipelineOptionsMap();
   }
 
   public String explain(String sqlString) throws ParseException {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteConnectionWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteConnectionWrapper.java
new file mode 100644
index 000000000000..e376ae5d0c48
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteConnectionWrapper.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.lang.reflect.Type;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Abstract wrapper for {@link CalciteConnection} to simplify extension.
+ *
+ * <p>Calcite JDBC model lacks convenient Connection class to extend that would also be supported by
+ * its factories without significant copy-pasting.
+ *
+ * <p>The purpose of this class is to hide the delegation logic from the children classes ({@link
+ * JdbcConnection}) to make them cleaner and easier to read. It has no functional significance.
+ *
+ * <p>This class only delegates to the underlying {@link CalciteConnection}, all added or modified
+ * functionality should go into into subclasses.
+ *
+ * <p>Ultimately a patch to Calcite can be made to simplify this logic.
+ */
+public abstract class CalciteConnectionWrapper implements CalciteConnection {
+  private CalciteConnection connection;
+
+  protected CalciteConnectionWrapper(CalciteConnection connection) {
+    this.connection = connection;
+  }
+
+  protected CalciteConnection connection() {
+    return connection;
+  }
+
+  @Override
+  public SchemaPlus getRootSchema() {
+    return connection.getRootSchema();
+  }
+
+  @Override
+  public JavaTypeFactory getTypeFactory() {
+    return connection.getTypeFactory();
+  }
+
+  @Override
+  public Properties getProperties() {
+    return connection.getProperties();
+  }
+
+  @Override
+  public Statement createStatement() throws SQLException {
+    return connection.createStatement();
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql) throws SQLException {
+    return connection.prepareStatement(sql);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql) throws SQLException {
+    return connection.prepareCall(sql);
+  }
+
+  @Override
+  public String nativeSQL(String sql) throws SQLException {
+    return connection.nativeSQL(sql);
+  }
+
+  @Override
+  public void setAutoCommit(boolean autoCommit) throws SQLException {
+    connection.setAutoCommit(autoCommit);
+  }
+
+  @Override
+  public boolean getAutoCommit() throws SQLException {
+    return connection.getAutoCommit();
+  }
+
+  @Override
+  public void commit() throws SQLException {
+    connection.commit();
+  }
+
+  @Override
+  public void rollback() throws SQLException {
+    connection.rollback();
+  }
+
+  @Override
+  public void close() throws SQLException {
+    connection.close();
+  }
+
+  @Override
+  public boolean isClosed() throws SQLException {
+    return connection.isClosed();
+  }
+
+  @Override
+  public DatabaseMetaData getMetaData() throws SQLException {
+    return connection.getMetaData();
+  }
+
+  @Override
+  public void setReadOnly(boolean readOnly) throws SQLException {
+    connection.setReadOnly(readOnly);
+  }
+
+  @Override
+  public boolean isReadOnly() throws SQLException {
+    return connection.isReadOnly();
+  }
+
+  @Override
+  public void setCatalog(String catalog) throws SQLException {
+    connection.setCatalog(catalog);
+  }
+
+  @Override
+  public String getCatalog() throws SQLException {
+    return connection.getCatalog();
+  }
+
+  @Override
+  public void setTransactionIsolation(int level) throws SQLException {
+    connection.setTransactionIsolation(level);
+  }
+
+  @Override
+  public int getTransactionIsolation() throws SQLException {
+    return connection.getTransactionIsolation();
+  }
+
+  @Override
+  public SQLWarning getWarnings() throws SQLException {
+    return connection.getWarnings();
+  }
+
+  @Override
+  public void clearWarnings() throws SQLException {
+    connection.clearWarnings();
+  }
+
+  @Override
+  public Statement createStatement(int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return connection.createStatement(resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return connection.prepareStatement(sql, resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
+      throws SQLException {
+    return connection.prepareCall(sql, resultSetType, resultSetConcurrency);
+  }
+
+  @Override
+  public Map<String, Class<?>> getTypeMap() throws SQLException {
+    return connection.getTypeMap();
+  }
+
+  @Override
+  public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+    connection.setTypeMap(map);
+  }
+
+  @Override
+  public void setHoldability(int holdability) throws SQLException {
+    connection.setHoldability(holdability);
+  }
+
+  @Override
+  public int getHoldability() throws SQLException {
+    return connection.getHoldability();
+  }
+
+  @Override
+  public Savepoint setSavepoint() throws SQLException {
+    return connection.setSavepoint();
+  }
+
+  @Override
+  public Savepoint setSavepoint(String name) throws SQLException {
+    return connection.setSavepoint(name);
+  }
+
+  @Override
+  public void rollback(Savepoint savepoint) throws SQLException {
+    connection.rollback(savepoint);
+  }
+
+  @Override
+  public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+    connection.releaseSavepoint(savepoint);
+  }
+
+  @Override
+  public Statement createStatement(
+      int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+    return connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(
+      String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+      throws SQLException {
+    return connection.prepareStatement(
+        sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public CallableStatement prepareCall(
+      String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+      throws SQLException {
+    return connection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+    return connection.prepareStatement(sql, autoGeneratedKeys);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+    return connection.prepareStatement(sql, columnIndexes);
+  }
+
+  @Override
+  public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+    return connection.prepareStatement(sql, columnNames);
+  }
+
+  @Override
+  public Clob createClob() throws SQLException {
+    return connection.createClob();
+  }
+
+  @Override
+  public Blob createBlob() throws SQLException {
+    return connection.createBlob();
+  }
+
+  @Override
+  public NClob createNClob() throws SQLException {
+    return connection.createNClob();
+  }
+
+  @Override
+  public SQLXML createSQLXML() throws SQLException {
+    return connection.createSQLXML();
+  }
+
+  @Override
+  public boolean isValid(int timeout) throws SQLException {
+    return connection.isValid(timeout);
+  }
+
+  @Override
+  public void setClientInfo(String name, String value) throws SQLClientInfoException {
+    connection.setClientInfo(name, value);
+  }
+
+  @Override
+  public void setClientInfo(Properties properties) throws SQLClientInfoException {
+    connection.setClientInfo(properties);
+  }
+
+  @Override
+  public String getClientInfo(String name) throws SQLException {
+    return connection.getClientInfo(name);
+  }
+
+  @Override
+  public Properties getClientInfo() throws SQLException {
+    return connection.getClientInfo();
+  }
+
+  @Override
+  public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+    return connection.createArrayOf(typeName, elements);
+  }
+
+  @Override
+  public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+    return connection.createStruct(typeName, attributes);
+  }
+
+  @Override
+  public void setSchema(String schema) throws SQLException {
+    connection.setSchema(schema);
+  }
+
+  @Override
+  public String getSchema() throws SQLException {
+    return connection.getSchema();
+  }
+
+  @Override
+  public void abort(Executor executor) throws SQLException {
+    connection.abort(executor);
+  }
+
+  @Override
+  public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+    connection.setNetworkTimeout(executor, milliseconds);
+  }
+
+  @Override
+  public int getNetworkTimeout() throws SQLException {
+    return connection.getNetworkTimeout();
+  }
+
+  @Override
+  public CalciteConnectionConfig config() {
+    return connection.config();
+  }
+
+  @Override
+  public CalcitePrepare.Context createPrepareContext() {
+    return connection.createPrepareContext();
+  }
+
+  @Override
+  public <T> T unwrap(Class<T> iface) throws SQLException {
+    return connection.unwrap(iface);
+  }
+
+  @Override
+  public boolean isWrapperFor(Class<?> iface) throws SQLException {
+    return connection.isWrapperFor(iface);
+  }
+
+  @Override
+  public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
+    return connection.createQuery(expression, rowType);
+  }
+
+  @Override
+  public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
+    return connection.createQuery(expression, rowType);
+  }
+
+  @Override
+  public <T> T execute(Expression expression, Class<T> type) {
+    return connection.execute(expression, type);
+  }
+
+  @Override
+  public Object execute(Expression expression, Type type) {
+    return connection.execute(expression, type);
+  }
+
+  @Override
+  public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+    return connection.executeQuery(queryable);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
new file mode 100644
index 000000000000..a4a7f479c10b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Beam JDBC Connection.
+ *
+ * <p>Implements and delegates to {@link CalciteConnection}, adds Beam-specific helper methods.
+ * {@link BeamCalciteSchema BeamCalciteSchemas} keep reference to this connection. Pipeline options
+ * are stored here.
+ */
+public class JdbcConnection extends CalciteConnectionWrapper {
+  /**
+   * Connection string parameters that begin with {@code "beam."} will be interpreted as {@link
+   * PipelineOptions}.
+   */
+  private static final String PIPELINE_OPTION_PREFIX = "beam.";
+
+  private Map<String, String> pipelineOptionsMap;
+
+  private JdbcConnection(CalciteConnection connection) throws SQLException {
+    super(connection);
+    this.pipelineOptionsMap = Collections.emptyMap();
+  }
+
+  /**
+   * Wraps and initializes the initial connection created by Calcite.
+   *
+   * <p>Sets the pipeline options, replaces the initial non-functional top-level schema with schema
+   * created by {@link BeamCalciteSchemaFactory}.
+   */
+  static @Nullable JdbcConnection initialize(CalciteConnection connection) throws SQLException {
+    if (connection == null) {
+      return null;
+    }
+
+    JdbcConnection jdbcConnection = new JdbcConnection(connection);
+    jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
+    jdbcConnection.setSchema(
+        connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
+    return jdbcConnection;
+  }
+
+  /**
+   * Reads the connection properties starting with {@link #PIPELINE_OPTION_PREFIX} and converts them
+   * to a map of pipeline options.
+   */
+  private static Map<String, String> extractPipelineOptions(CalciteConnection calciteConnection) {
+    return calciteConnection
+        .getProperties()
+        .entrySet()
+        .stream()
+        .map(entry -> KV.of(entry.getKey().toString(), entry.getValue().toString()))
+        .filter(kv -> kv.getKey().startsWith(PIPELINE_OPTION_PREFIX))
+        .map(kv -> KV.of(kv.getKey().substring(PIPELINE_OPTION_PREFIX.length()), kv.getValue()))
+        .collect(Collectors.toMap(KV::getKey, KV::getValue));
+  }
+
+  Map<String, String> getPipelineOptionsMap() {
+    return pipelineOptionsMap;
+  }
+
+  /**
+   * Only called from the {@link BeamCalciteSchema}. This is needed to enable the `{@code SET
+   * pipelineOption = blah}` syntax
+   */
+  public void setPipelineOptionsMap(Map<String, String> pipelineOptionsMap) {
+    this.pipelineOptionsMap = ImmutableMap.copyOf(pipelineOptionsMap);
+  }
+
+  /** Get the current default schema from the root schema. */
+  @SuppressWarnings("TypeParameterUnusedInFormals")
+  <T> T getCurrentBeamSchema() {
+    try {
+      return (T) CalciteSchema.from(getRootSchema().getSubSchema(getSchema())).schema;
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Calcite-created {@link SchemaPlus} wrapper for the current schema. */
+  SchemaPlus getCurrentSchemaPlus() {
+    try {
+      return getRootSchema().getSubSchema(getSchema());
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sets the top-level schema '{@code name}' to '{@code tableProvider}'.
+   *
+   * <p>Overrides the schema if it exists.
+   */
+  void setSchema(String name, TableProvider tableProvider) {
+    BeamCalciteSchema beamCalciteSchema = new BeamCalciteSchema(this, tableProvider);
+    SchemaPlus addedSchemaPlus = getRootSchema().add(name, beamCalciteSchema);
+    addedSchemaPlus.setCacheEnabled(false);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index ce656ff73346..0cf8423ffa63 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -17,6 +17,12 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
+import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE;
+import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
+import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM;
 import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
 
 import com.google.auto.service.AutoService;
@@ -24,23 +30,18 @@
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.function.Consumer;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
 import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.calcite.avatica.BuiltInConnectionProperty;
-import org.apache.calcite.avatica.ConnectStringParser;
 import org.apache.calcite.avatica.ConnectionProperty;
-import org.apache.calcite.config.CalciteConnectionProperty;
 import org.apache.calcite.config.Lex;
 import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.jdbc.Driver;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
@@ -50,7 +51,6 @@
 import org.apache.calcite.rel.rules.CalcRemoveRule;
 import org.apache.calcite.rel.rules.SortRemoveRule;
 import org.apache.calcite.runtime.Hook;
-import org.apache.calcite.schema.SchemaPlus;
 import org.apache.calcite.tools.RuleSet;
 
 /**
@@ -66,14 +66,7 @@
 public class JdbcDriver extends Driver {
   public static final JdbcDriver INSTANCE = new JdbcDriver();
   public static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
-
-  /**
-   * Querystring parameters that begin with {@code "beam."} will be interpreted as {@link
-   * PipelineOptions}.
-   */
-  public static final String BEAM_QUERYSTRING_PREFIX = "beam.";
-
-  private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";
+  static final String TOP_LEVEL_BEAM_SCHEMA = "beam";
 
   static {
     ClassLoader origLoader = Thread.currentThread().getContextClassLoader();
@@ -120,79 +113,89 @@ protected String getConnectStringPrefix() {
     return CONNECT_STRING_PREFIX;
   }
 
+  /**
+   * Configures Beam-specific options and opens a JDBC connection to Calcite.
+   *
+   * <p>If {@code originalConnectionProperties} doesn't have the Beam-specific properties, populates
+   * them with defaults (e.g. sets the default schema name to "beam").
+   *
+   * <p>Returns null if {@code url} doesn't begin with {@link #CONNECT_STRING_PREFIX}. This seems to
+   * be how JDBC decides whether a driver can handle a request. It tries to connect to it, and if
+   * the result is null it picks another driver.
+   *
+   * <p>Returns an instance of {@link JdbcConnection} which is a Beam wrapper around {@link
+   * CalciteConnection}.
+   */
   @Override
-  public Connection connect(String url, Properties info) throws SQLException {
+  public Connection connect(String url, Properties originalConnectionProperties)
+      throws SQLException {
+
+    // do this check before even looking into properties
+    // do not remove this, please
     if (!acceptsURL(url)) {
       return null;
     }
 
-    final BeamCalciteSchema beamCalciteSchema = (BeamCalciteSchema) info.get(BEAM_CALCITE_SCHEMA);
-
-    Properties info2 = new Properties(info);
-    setDefault(info2, BuiltInConnectionProperty.TIME_ZONE, "UTC");
-    setDefault(info2, CalciteConnectionProperty.LEX, Lex.JAVA.name());
-    setDefault(
-        info2,
-        CalciteConnectionProperty.PARSER_FACTORY,
-        BeamSqlParserImpl.class.getName() + "#FACTORY");
-    setDefault(info2, CalciteConnectionProperty.TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
-    setDefault(info2, CalciteConnectionProperty.SCHEMA, "beam");
-    setDefault(
-        info2, CalciteConnectionProperty.SCHEMA_FACTORY, BeamCalciteSchemaFactory.class.getName());
-
-    CalciteConnection connection = (CalciteConnection) super.connect(url, info2);
-    final SchemaPlus defaultSchema;
-    if (beamCalciteSchema != null) {
-      defaultSchema =
-          connection.getRootSchema().add(connection.config().schema(), beamCalciteSchema);
-      connection.setSchema(defaultSchema.getName());
-    } else {
-      defaultSchema = getDefaultSchema(connection);
-    }
+    Properties connectionProps = ensureDefaultProperties(originalConnectionProperties);
+    CalciteConnection calciteConnection = (CalciteConnection) super.connect(url, connectionProps);
 
-    // Beam schema may change without notifying Calcite
-    defaultSchema.setCacheEnabled(false);
-
-    // Set default PipelineOptions to which we apply the querystring
-    Map<String, String> pipelineOptionsMap =
-        ((BeamCalciteSchema) CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
-    ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
-    pipelineOptionsMap.put("userAgent", String.format("BeamSQL/%s", releaseInfo.getVersion()));
-
-    String querystring = url.substring(CONNECT_STRING_PREFIX.length());
-    for (Map.Entry<Object, Object> propertyValue :
-        ConnectStringParser.parse(querystring).entrySet()) {
-      String name = (String) propertyValue.getKey();
-      if (name.startsWith(BEAM_QUERYSTRING_PREFIX)) {
-        pipelineOptionsMap.put(
-            name.substring(BEAM_QUERYSTRING_PREFIX.length()), (String) propertyValue.getValue());
-      }
-    }
-    return connection;
+    // calciteConnection is initialized with an empty Beam schema,
+    // we need to populate it with pipeline options, load table providers, etc
+    return JdbcConnection.initialize(calciteConnection);
+  }
+
+  /**
+   * Make sure required default properties are set.
+   *
+   * <p>Among other things sets up the parser class name, rel data type system and default schema
+   * factory.
+   *
+   * <p>The specified Beam schema factory will be used by Calcite to create the initial top level
+   * Beam schema. It can be later overridden by setting the schema via {@link
+   * JdbcConnection#setSchema(String, TableProvider)}.
+   */
+  private Properties ensureDefaultProperties(Properties originalInfo) {
+    Properties info = new Properties();
+    info.putAll(originalInfo);
+
+    setIfNull(info, TIME_ZONE, "UTC");
+    setIfNull(info, LEX, Lex.JAVA.name());
+    setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY");
+    setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
+    setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA);
+    setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName());
+
+    info.put("beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion());
+
+    return info;
   }
 
-  private static void setDefault(Properties info, ConnectionProperty key, String value) {
+  private static void setIfNull(Properties info, ConnectionProperty key, String value) {
     // A null value indicates the default. We want to override defaults only.
     if (info.getProperty(key.camelName()) == null) {
       info.setProperty(key.camelName(), value);
     }
   }
 
-  @VisibleForTesting
-  public static CalciteConnection connect(TableProvider tableProvider) {
-    try {
-      Properties info = new Properties();
-      info.put(BEAM_CALCITE_SCHEMA, new BeamCalciteSchema(tableProvider));
-      return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static SchemaPlus getDefaultSchema(CalciteConnection connection) {
+  /**
+   * Connects to the driver using standard {@link #connect(String, Properties)} call, but overrides
+   * the initial schema factory. Default factory would load up all table providers. The one
+   * specified here doesn't load any providers. We then override the top-level schema with the
+   * {@code tableProvider}.
+   *
+   * <p>This is called in tests and {@link BeamSqlEnv}, core part of {@link SqlTransform}. CLI uses
+   * standard JDBC driver registry, and goes through {@link #connect(String, Properties)} instead,
+   * not this path. The CLI ends up using the schema factory that populates the default schema with
+   * all table providers it can find. See {@link BeamCalciteSchemaFactory}.
+   */
+  public static JdbcConnection connect(TableProvider tableProvider) {
     try {
-      String defaultSchemaName = connection.getSchema();
-      return connection.getRootSchema().getSubSchema(defaultSchemaName);
+      Properties properties = new Properties();
+      setIfNull(properties, SCHEMA_FACTORY, BeamCalciteSchemaFactory.Empty.class.getName());
+      JdbcConnection connection =
+          (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
+      connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
+      return connection;
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
index a0c05d6841f2..7314305f4a3f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
@@ -19,7 +19,6 @@
 
 import static org.apache.calcite.util.Static.RESOURCE;
 
-import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
 import org.apache.calcite.jdbc.CalcitePrepare;
 import org.apache.calcite.jdbc.CalciteSchema;
@@ -48,21 +47,15 @@ public void execute(CalcitePrepare.Context context) {
           name.getParserPosition(),
           RESOURCE.internal("Schema is not instanceof BeamCalciteSchema"));
     }
+
     BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
-    Map<String, String> options = schema.getPipelineOptions();
-    if (options == null) {
-      throw SqlUtil.newContextException(
-          name.getParserPosition(),
-          RESOURCE.internal("PipelineOptions not accessible via BeamCalciteSchema"));
-    }
-    if (value == null) {
-      if ("ALL".equals(pair.right)) {
-        options.clear();
-      } else {
-        options.remove(pair.right);
-      }
+
+    if (value != null) {
+      schema.setPipelineOption(pair.right, SqlDdlNodes.getString(value));
+    } else if ("ALL".equals(pair.right)) {
+      schema.removeAllPipelineOptions();
     } else {
-      options.put(pair.right, SqlDdlNodes.getString(value));
+      schema.removePipelineOption(pair.right);
     }
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index cfbcb7c00378..e1a47d13c55e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -19,6 +19,7 @@
 
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 
 /**
@@ -26,6 +27,10 @@
  *
  * <p>So there will be a provider to handle textfile(CSV) based tables, there is a provider to
  * handle MySQL based tables, a provider to handle Casandra based tables etc.
+ *
+ * <p><i>Note:</i> all implementations marked with {@code @AutoService(TableProvider.class)} are
+ * automatically loaded by CLI or other cases when {@link JdbcDriver} is used with default
+ * connection parameters.
  */
 public interface TableProvider {
   /** Gets the table type this provider handles. */
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index bc6d93d93087..062700ba0fe6 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -30,7 +30,6 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -39,8 +38,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -339,24 +338,20 @@ private CalciteConnection connect(PipelineOptions options, TableProvider... tabl
       throws SQLException {
     // HACK: PipelineOptions should expose a prominent method to do this reliably
     // The actual options are in the "options" field of the converted map
-    Map<String, Object> optionsMap =
-        (Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options");
     Map<String, String> argsMap =
-        optionsMap
+        ((Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options"))
             .entrySet()
             .stream()
-            .collect(Collectors.toMap(entry -> entry.getKey(), entry -> toArg(entry.getValue())));
+            .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
 
     InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
     for (TableProvider tableProvider : tableProviders) {
       inMemoryMetaStore.registerProvider(tableProvider);
     }
 
-    Properties info = new Properties();
-    BeamCalciteSchema dbSchema = new BeamCalciteSchema(inMemoryMetaStore);
-    dbSchema.getPipelineOptions().putAll(argsMap);
-    info.put(BEAM_CALCITE_SCHEMA, dbSchema);
-    return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
+    JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore);
+    connection.setPipelineOptionsMap(argsMap);
+    return connection;
   }
 
   private static Boolean containsAll(Set<PubsubMessage> set, PubsubMessage... subsetCandidate) {


With regards,
Apache Git Services