You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/14 21:40:04 UTC
[beam] Diff for: [GitHub] akedin merged pull request #7488: [SQL] Refactor
JdbcDriver
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
index fa5dcb3cfcce..8d0bb9092c4f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java
@@ -19,11 +19,11 @@
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.Function;
@@ -34,12 +34,12 @@
/** Adapter from {@link TableProvider} to {@link Schema}. */
public class BeamCalciteSchema implements Schema {
- private final TableProvider tableProvider;
- private final Map<String, String> pipelineOptions;
+ private JdbcConnection connection;
+ private TableProvider tableProvider;
- public BeamCalciteSchema(TableProvider tableProvider) {
+ BeamCalciteSchema(JdbcConnection jdbcConnection, TableProvider tableProvider) {
+ this.connection = jdbcConnection;
this.tableProvider = tableProvider;
- this.pipelineOptions = Maps.newHashMap();
}
public TableProvider getTableProvider() {
@@ -47,7 +47,23 @@ public TableProvider getTableProvider() {
}
public Map<String, String> getPipelineOptions() {
- return pipelineOptions;
+ return connection.getPipelineOptionsMap();
+ }
+
+ public void setPipelineOption(String key, String value) {
+ Map<String, String> options = new HashMap<>(connection.getPipelineOptionsMap());
+ options.put(key, value);
+ connection.setPipelineOptionsMap(options);
+ }
+
+ public void removePipelineOption(String key) {
+ Map<String, String> options = new HashMap<>(connection.getPipelineOptionsMap());
+ options.remove(key);
+ connection.setPipelineOptionsMap(options);
+ }
+
+ public void removeAllPipelineOptions() {
+ connection.setPipelineOptionsMap(Collections.emptyMap());
}
@Override
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
index dc07280558b6..80bd980b4b85 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -17,28 +17,186 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
+import java.util.Properties;
import java.util.ServiceLoader;
+import java.util.Set;
+import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.rel.type.RelProtoDataType;
+import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.SchemaVersion;
+import org.apache.calcite.schema.Table;
-/** Factory that creates a {@link BeamCalciteSchema}. */
-public class BeamCalciteSchemaFactory implements SchemaFactory {
- public static final BeamCalciteSchemaFactory INSTANCE = new BeamCalciteSchemaFactory();
+/**
+ * Factory classes that Calcite uses to create initial schema for JDBC connection.
+ *
+ * <p>The name of the factory class is passed in the connection properties into the JDBC driver
+ * {@code connect()} method. Calcite then creates the factory using the default constructor. It
+ * doesn't allow to hook into the process to pass in additional parameters (e.g. connection), so we
+ * have to add the indirection layer.
+ *
+ * <p>{@link AllProviders} is the default factory that is used by {@link JdbcDriver} unless an
+ * override is specified. It greedily finds and loads all available table providers. This is used in
+ * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection
+ * properties).
+ *
+ * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid
+ * loading all available table providers.
+ */
+class BeamCalciteSchemaFactory {
+
+ /**
+ * Called by {@link JdbcConnection} when initializing to convert the initial empty schema to
+ * actual {@link BeamCalciteSchema}.
+ */
+ static TableProvider fromInitialEmptySchema(JdbcConnection jdbcConnection) {
+ InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema();
+ return initialEmptySchema.getTableProvider();
+ }
+
+ /**
+ * Loads all table providers using service loader. This is the default configured in {@link
+ * JdbcDriver#connect(String, Properties)}.
+ */
+ public static class AllProviders extends InitialEmptySchema implements SchemaFactory {
+
+ /**
+ * We call this in {@link #fromInitialEmptySchema(JdbcConnection)} to convert the schema created
+ * by Calcite to a configured table provider. At this point we have a connection open and can
+ * use it to configure Beam schemas, e.g. with pipeline options.
+ *
+ * <p><i>Note:</i> this loads ALL available table providers marked with
+ * {@code @AutoService(TableProvider.class)}
+ */
+ @Override
+ public TableProvider getTableProvider() {
+ MetaStore metaStore = new InMemoryMetaStore();
+ for (TableProvider provider :
+ ServiceLoader.load(TableProvider.class, getClass().getClassLoader())) {
+ metaStore.registerProvider(provider);
+ }
+ return metaStore;
+ }
+
+ /** This is what Calcite calls to create an instance of the default top level schema. */
+ @Override
+ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+ return this;
+ }
+ }
+
+ /**
+ * This is the override to create an empty schema, used in {@link
+ * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is
+ * expected to be replaced by an actual functional schema by the same code that specified this
+ * override in the first place.
+ */
+ public static class Empty extends InitialEmptySchema implements SchemaFactory {
+
+ private static final TableProvider READ_ONLY_TABLE_PROVIDER =
+ new ReadOnlyTableProvider("empty", ImmutableMap.of());
+
+ /**
+ * We call this in {@link #fromInitialEmptySchema(JdbcConnection)} to convert the schema created
+ * by Calcite to a configured table provider. This specific instance is an empty readonly
+ * provider that is supposed to be replaced by the code that specified this empty schema in the
+ * connection properties to the driver.
+ */
+ @Override
+ public TableProvider getTableProvider() {
+ return READ_ONLY_TABLE_PROVIDER;
+ }
+
+ /** This is what Calcite calls to create an instance of the top level schema. */
+ @Override
+ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
+ return this;
+ }
+ }
+
+ /**
+ * Empty schema that {@link CalciteConnection} is initialized with by Calcite using the factories
+ * above. It is not flexible enough to allow us to configure the schema during the initialization.
+ *
+ * <p>This solution with overrides and conversions allows us to handle the initialization
+ * ourselves later after Calcite has created the connection with this empty schema.
+ *
+ * <p>This is a temporary indirection for initial connection initialization, allows us to later
+ * replace this empty schema in the connection with the actual correctly configured schema.
+ */
+ public abstract static class InitialEmptySchema implements Schema {
+
+ public abstract TableProvider getTableProvider();
- private BeamCalciteSchemaFactory() {}
+ @Override
+ public Table getTable(String name) {
+ return illegal();
+ }
+
+ @Override
+ public Set<String> getTableNames() {
+ return illegal();
+ }
+
+ @Override
+ public RelProtoDataType getType(String name) {
+ return illegal();
+ }
+
+ @Override
+ public Set<String> getTypeNames() {
+ return illegal();
+ }
+
+ @Override
+ public Collection<Function> getFunctions(String name) {
+ return illegal();
+ }
+
+ @Override
+ public Set<String> getFunctionNames() {
+ return illegal();
+ }
+
+ @Override
+ public Schema getSubSchema(String name) {
+ return illegal();
+ }
+
+ @Override
+ public Set<String> getSubSchemaNames() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public Expression getExpression(SchemaPlus parentSchema, String name) {
+ return illegal();
+ }
+
+ @Override
+ public boolean isMutable() {
+ return illegal();
+ }
+
+ @Override
+ public Schema snapshot(SchemaVersion version) {
+ return illegal();
+ }
- @Override
- public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
- MetaStore metaStore = new InMemoryMetaStore();
- for (TableProvider provider :
- ServiceLoader.load(TableProvider.class, getClass().getClassLoader())) {
- metaStore.registerProvider(provider);
+ @SuppressWarnings("TypeParameterUnusedInFormals")
+ private static <T> T illegal() {
+ throw new IllegalStateException("Beam JDBC connection has not been initialized");
}
- return new BeamCalciteSchema(metaStore);
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
index ed6b48046da7..65b2f1556555 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamQueryPlanner.java
@@ -22,7 +22,6 @@
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.calcite.config.CalciteConnectionConfig;
-import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.Contexts;
import org.apache.calcite.plan.ConventionTraitDef;
@@ -57,7 +56,7 @@
private final FrameworkConfig config;
- BeamQueryPlanner(CalciteConnection connection) {
+ BeamQueryPlanner(JdbcConnection connection) {
final CalciteConnectionConfig config = connection.config();
final SqlParser.ConfigBuilder parserConfig =
SqlParser.configBuilder()
@@ -73,7 +72,7 @@
}
final SchemaPlus schema = connection.getRootSchema();
- final SchemaPlus defaultSchema = JdbcDriver.getDefaultSchema(connection);
+ final SchemaPlus defaultSchema = connection.getCurrentSchemaPlus();
final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 1760cf78984b..5841efb64f8c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -33,11 +33,8 @@
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalcitePrepare;
-import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.plan.RelOptUtil;
-import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlExecutableStatement;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.RelConversionException;
@@ -50,13 +47,11 @@
@Internal
@Experimental
public class BeamSqlEnv {
- final CalciteConnection connection;
- final SchemaPlus defaultSchema;
+ final JdbcConnection connection;
final BeamQueryPlanner planner;
private BeamSqlEnv(TableProvider tableProvider) {
connection = JdbcDriver.connect(tableProvider);
- defaultSchema = JdbcDriver.getDefaultSchema(connection);
planner = new BeamQueryPlanner(connection);
}
@@ -80,14 +75,14 @@ public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
private void registerBuiltinUdf(Map<String, List<Method>> methods) {
for (Map.Entry<String, List<Method>> entry : methods.entrySet()) {
for (Method method : entry.getValue()) {
- defaultSchema.add(entry.getKey(), UdfImpl.create(method));
+ connection.getCurrentSchemaPlus().add(entry.getKey(), UdfImpl.create(method));
}
}
}
/** Register a UDF function which can be used in SQL expression. */
public void registerUdf(String functionName, Class<?> clazz, String method) {
- defaultSchema.add(functionName, UdfImpl.create(clazz, method));
+ connection.getCurrentSchemaPlus().add(functionName, UdfImpl.create(clazz, method));
}
/** Register a UDF function which can be used in SQL expression. */
@@ -108,7 +103,7 @@ public void registerUdf(String functionName, SerializableFunction sfn) {
* org.apache.beam.sdk.transforms.Combine.CombineFn} on how to implement a UDAF.
*/
public void registerUdaf(String functionName, Combine.CombineFn combineFn) {
- defaultSchema.add(functionName, new UdafImpl(combineFn));
+ connection.getCurrentSchemaPlus().add(functionName, new UdafImpl(combineFn));
}
/** Load all UDF/UDAF from {@link UdfUdafProvider}. */
@@ -160,7 +155,7 @@ public void executeDdl(String sqlStatement) throws ParseException {
}
public Map<String, String> getPipelineOptions() {
- return ((BeamCalciteSchema) CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
+ return connection.getPipelineOptionsMap();
}
public String explain(String sqlString) throws ParseException {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteConnectionWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteConnectionWrapper.java
new file mode 100644
index 000000000000..e376ae5d0c48
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteConnectionWrapper.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.lang.reflect.Type;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.config.CalciteConnectionConfig;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalcitePrepare;
+import org.apache.calcite.linq4j.Enumerator;
+import org.apache.calcite.linq4j.Queryable;
+import org.apache.calcite.linq4j.tree.Expression;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Abstract wrapper for {@link CalciteConnection} to simplify extension.
+ *
+ * <p>Calcite JDBC model lacks convenient Connection class to extend that would also be supported by
+ * its factories without significant copy-pasting.
+ *
+ * <p>The purpose of this class is to hide the delegation logic from the children classes ({@link
+ * JdbcConnection}) to make them cleaner and easier to read. It has no functional significance.
+ *
+ * <p>This class only delegates to the underlying {@link CalciteConnection}, all added or modified
+ * functionality should go into into subclasses.
+ *
+ * <p>Ultimately a patch to Calcite can be made to simplify this logic.
+ */
+public abstract class CalciteConnectionWrapper implements CalciteConnection {
+ private CalciteConnection connection;
+
+ protected CalciteConnectionWrapper(CalciteConnection connection) {
+ this.connection = connection;
+ }
+
+ protected CalciteConnection connection() {
+ return connection;
+ }
+
+ @Override
+ public SchemaPlus getRootSchema() {
+ return connection.getRootSchema();
+ }
+
+ @Override
+ public JavaTypeFactory getTypeFactory() {
+ return connection.getTypeFactory();
+ }
+
+ @Override
+ public Properties getProperties() {
+ return connection.getProperties();
+ }
+
+ @Override
+ public Statement createStatement() throws SQLException {
+ return connection.createStatement();
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql) throws SQLException {
+ return connection.prepareStatement(sql);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql) throws SQLException {
+ return connection.prepareCall(sql);
+ }
+
+ @Override
+ public String nativeSQL(String sql) throws SQLException {
+ return connection.nativeSQL(sql);
+ }
+
+ @Override
+ public void setAutoCommit(boolean autoCommit) throws SQLException {
+ connection.setAutoCommit(autoCommit);
+ }
+
+ @Override
+ public boolean getAutoCommit() throws SQLException {
+ return connection.getAutoCommit();
+ }
+
+ @Override
+ public void commit() throws SQLException {
+ connection.commit();
+ }
+
+ @Override
+ public void rollback() throws SQLException {
+ connection.rollback();
+ }
+
+ @Override
+ public void close() throws SQLException {
+ connection.close();
+ }
+
+ @Override
+ public boolean isClosed() throws SQLException {
+ return connection.isClosed();
+ }
+
+ @Override
+ public DatabaseMetaData getMetaData() throws SQLException {
+ return connection.getMetaData();
+ }
+
+ @Override
+ public void setReadOnly(boolean readOnly) throws SQLException {
+ connection.setReadOnly(readOnly);
+ }
+
+ @Override
+ public boolean isReadOnly() throws SQLException {
+ return connection.isReadOnly();
+ }
+
+ @Override
+ public void setCatalog(String catalog) throws SQLException {
+ connection.setCatalog(catalog);
+ }
+
+ @Override
+ public String getCatalog() throws SQLException {
+ return connection.getCatalog();
+ }
+
+ @Override
+ public void setTransactionIsolation(int level) throws SQLException {
+ connection.setTransactionIsolation(level);
+ }
+
+ @Override
+ public int getTransactionIsolation() throws SQLException {
+ return connection.getTransactionIsolation();
+ }
+
+ @Override
+ public SQLWarning getWarnings() throws SQLException {
+ return connection.getWarnings();
+ }
+
+ @Override
+ public void clearWarnings() throws SQLException {
+ connection.clearWarnings();
+ }
+
+ @Override
+ public Statement createStatement(int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return connection.createStatement(resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return connection.prepareStatement(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency)
+ throws SQLException {
+ return connection.prepareCall(sql, resultSetType, resultSetConcurrency);
+ }
+
+ @Override
+ public Map<String, Class<?>> getTypeMap() throws SQLException {
+ return connection.getTypeMap();
+ }
+
+ @Override
+ public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+ connection.setTypeMap(map);
+ }
+
+ @Override
+ public void setHoldability(int holdability) throws SQLException {
+ connection.setHoldability(holdability);
+ }
+
+ @Override
+ public int getHoldability() throws SQLException {
+ return connection.getHoldability();
+ }
+
+ @Override
+ public Savepoint setSavepoint() throws SQLException {
+ return connection.setSavepoint();
+ }
+
+ @Override
+ public Savepoint setSavepoint(String name) throws SQLException {
+ return connection.setSavepoint(name);
+ }
+
+ @Override
+ public void rollback(Savepoint savepoint) throws SQLException {
+ connection.rollback(savepoint);
+ }
+
+ @Override
+ public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+ connection.releaseSavepoint(savepoint);
+ }
+
+ @Override
+ public Statement createStatement(
+ int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
+ return connection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(
+ String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+ throws SQLException {
+ return connection.prepareStatement(
+ sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public CallableStatement prepareCall(
+ String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability)
+ throws SQLException {
+ return connection.prepareCall(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+ return connection.prepareStatement(sql, autoGeneratedKeys);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
+ return connection.prepareStatement(sql, columnIndexes);
+ }
+
+ @Override
+ public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
+ return connection.prepareStatement(sql, columnNames);
+ }
+
+ @Override
+ public Clob createClob() throws SQLException {
+ return connection.createClob();
+ }
+
+ @Override
+ public Blob createBlob() throws SQLException {
+ return connection.createBlob();
+ }
+
+ @Override
+ public NClob createNClob() throws SQLException {
+ return connection.createNClob();
+ }
+
+ @Override
+ public SQLXML createSQLXML() throws SQLException {
+ return connection.createSQLXML();
+ }
+
+ @Override
+ public boolean isValid(int timeout) throws SQLException {
+ return connection.isValid(timeout);
+ }
+
+ @Override
+ public void setClientInfo(String name, String value) throws SQLClientInfoException {
+ connection.setClientInfo(name, value);
+ }
+
+ @Override
+ public void setClientInfo(Properties properties) throws SQLClientInfoException {
+ connection.setClientInfo(properties);
+ }
+
+ @Override
+ public String getClientInfo(String name) throws SQLException {
+ return connection.getClientInfo(name);
+ }
+
+ @Override
+ public Properties getClientInfo() throws SQLException {
+ return connection.getClientInfo();
+ }
+
+ @Override
+ public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+ return connection.createArrayOf(typeName, elements);
+ }
+
+ @Override
+ public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
+ return connection.createStruct(typeName, attributes);
+ }
+
+ @Override
+ public void setSchema(String schema) throws SQLException {
+ connection.setSchema(schema);
+ }
+
+ @Override
+ public String getSchema() throws SQLException {
+ return connection.getSchema();
+ }
+
+ @Override
+ public void abort(Executor executor) throws SQLException {
+ connection.abort(executor);
+ }
+
+ @Override
+ public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
+ connection.setNetworkTimeout(executor, milliseconds);
+ }
+
+ @Override
+ public int getNetworkTimeout() throws SQLException {
+ return connection.getNetworkTimeout();
+ }
+
+ @Override
+ public CalciteConnectionConfig config() {
+ return connection.config();
+ }
+
+ @Override
+ public CalcitePrepare.Context createPrepareContext() {
+ return connection.createPrepareContext();
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> iface) throws SQLException {
+ return connection.unwrap(iface);
+ }
+
+ @Override
+ public boolean isWrapperFor(Class<?> iface) throws SQLException {
+ return connection.isWrapperFor(iface);
+ }
+
+ @Override
+ public <T> Queryable<T> createQuery(Expression expression, Class<T> rowType) {
+ return connection.createQuery(expression, rowType);
+ }
+
+ @Override
+ public <T> Queryable<T> createQuery(Expression expression, Type rowType) {
+ return connection.createQuery(expression, rowType);
+ }
+
+ @Override
+ public <T> T execute(Expression expression, Class<T> type) {
+ return connection.execute(expression, type);
+ }
+
+ @Override
+ public Object execute(Expression expression, Type type) {
+ return connection.execute(expression, type);
+ }
+
+ @Override
+ public <T> Enumerator<T> executeQuery(Queryable<T> queryable) {
+ return connection.executeQuery(queryable);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
new file mode 100644
index 000000000000..a4a7f479c10b
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import com.google.common.collect.ImmutableMap;
+import java.sql.SQLException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.values.KV;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteSchema;
+import org.apache.calcite.schema.SchemaPlus;
+
+/**
+ * Beam JDBC Connection.
+ *
+ * <p>Implements and delegates to {@link CalciteConnection}, adds Beam-specific helper methods.
+ * {@link BeamCalciteSchema BeamCalciteSchemas} keep reference to this connection. Pipeline options
+ * are stored here.
+ */
+public class JdbcConnection extends CalciteConnectionWrapper {
+ /**
+ * Connection string parameters that begin with {@code "beam."} will be interpreted as {@link
+ * PipelineOptions}.
+ */
+ private static final String PIPELINE_OPTION_PREFIX = "beam.";
+
+ private Map<String, String> pipelineOptionsMap;
+
+ private JdbcConnection(CalciteConnection connection) throws SQLException {
+ super(connection);
+ this.pipelineOptionsMap = Collections.emptyMap();
+ }
+
+ /**
+ * Wraps and initializes the initial connection created by Calcite.
+ *
+ * <p>Sets the pipeline options, replaces the initial non-functional top-level schema with schema
+ * created by {@link BeamCalciteSchemaFactory}.
+ */
+ static @Nullable JdbcConnection initialize(CalciteConnection connection) throws SQLException {
+ if (connection == null) {
+ return null;
+ }
+
+ JdbcConnection jdbcConnection = new JdbcConnection(connection);
+ jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
+ jdbcConnection.setSchema(
+ connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
+ return jdbcConnection;
+ }
+
+ /**
+ * Reads the connection properties starting with {@link #PIPELINE_OPTION_PREFIX} and converts them
+ * to a map of pipeline options.
+ */
+ private static Map<String, String> extractPipelineOptions(CalciteConnection calciteConnection) {
+ return calciteConnection
+ .getProperties()
+ .entrySet()
+ .stream()
+ .map(entry -> KV.of(entry.getKey().toString(), entry.getValue().toString()))
+ .filter(kv -> kv.getKey().startsWith(PIPELINE_OPTION_PREFIX))
+ .map(kv -> KV.of(kv.getKey().substring(PIPELINE_OPTION_PREFIX.length()), kv.getValue()))
+ .collect(Collectors.toMap(KV::getKey, KV::getValue));
+ }
+
+ Map<String, String> getPipelineOptionsMap() {
+ return pipelineOptionsMap;
+ }
+
+ /**
+ * Only called from the {@link BeamCalciteSchema}. This is needed to enable the `{@code SET
+ * pipelineOption = blah}` syntax
+ */
+ public void setPipelineOptionsMap(Map<String, String> pipelineOptionsMap) {
+ this.pipelineOptionsMap = ImmutableMap.copyOf(pipelineOptionsMap);
+ }
+
+ /** Get the current default schema from the root schema. */
+ @SuppressWarnings("TypeParameterUnusedInFormals")
+ <T> T getCurrentBeamSchema() {
+ try {
+ return (T) CalciteSchema.from(getRootSchema().getSubSchema(getSchema())).schema;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /** Calcite-created {@link SchemaPlus} wrapper for the current schema. */
+ SchemaPlus getCurrentSchemaPlus() {
+ try {
+ return getRootSchema().getSubSchema(getSchema());
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Sets the top-level schema '{@code name}' to '{@code tableProvider}'.
+ *
+ * <p>Overrides the schema if it exists.
+ */
+ void setSchema(String name, TableProvider tableProvider) {
+ BeamCalciteSchema beamCalciteSchema = new BeamCalciteSchema(this, tableProvider);
+ SchemaPlus addedSchemaPlus = getRootSchema().add(name, beamCalciteSchema);
+ addedSchemaPlus.setCacheEnabled(false);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index ce656ff73346..0cf8423ffa63 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -17,6 +17,12 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
+import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE;
+import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
+import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM;
import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
import com.google.auto.service.AutoService;
@@ -24,23 +30,18 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.apache.calcite.avatica.BuiltInConnectionProperty;
-import org.apache.calcite.avatica.ConnectStringParser;
import org.apache.calcite.avatica.ConnectionProperty;
-import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
@@ -50,7 +51,6 @@
import org.apache.calcite.rel.rules.CalcRemoveRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.runtime.Hook;
-import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.RuleSet;
/**
@@ -66,14 +66,7 @@
public class JdbcDriver extends Driver {
public static final JdbcDriver INSTANCE = new JdbcDriver();
public static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
-
- /**
- * Querystring parameters that begin with {@code "beam."} will be interpreted as {@link
- * PipelineOptions}.
- */
- public static final String BEAM_QUERYSTRING_PREFIX = "beam.";
-
- private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";
+ static final String TOP_LEVEL_BEAM_SCHEMA = "beam";
static {
ClassLoader origLoader = Thread.currentThread().getContextClassLoader();
@@ -120,79 +113,89 @@ protected String getConnectStringPrefix() {
return CONNECT_STRING_PREFIX;
}
+ /**
+ * Configures Beam-specific options and opens a JDBC connection to Calcite.
+ *
+ * <p>If {@code originalConnectionProperties} doesn't have the Beam-specific properties, populates
+ * them with defaults (e.g. sets the default schema name to "beam").
+ *
+ * <p>Returns null if {@code url} doesn't begin with {@link #CONNECT_STRING_PREFIX}. This seems to
+ * be how JDBC decides whether a driver can handle a request. It tries to connect to it, and if
+ * the result is null it picks another driver.
+ *
+ * <p>Returns an instance of {@link JdbcConnection} which is a Beam wrapper around {@link
+ * CalciteConnection}.
+ */
@Override
- public Connection connect(String url, Properties info) throws SQLException {
+ public Connection connect(String url, Properties originalConnectionProperties)
+ throws SQLException {
+
+ // do this check before even looking into properties
+ // do not remove this, please
if (!acceptsURL(url)) {
return null;
}
- final BeamCalciteSchema beamCalciteSchema = (BeamCalciteSchema) info.get(BEAM_CALCITE_SCHEMA);
-
- Properties info2 = new Properties(info);
- setDefault(info2, BuiltInConnectionProperty.TIME_ZONE, "UTC");
- setDefault(info2, CalciteConnectionProperty.LEX, Lex.JAVA.name());
- setDefault(
- info2,
- CalciteConnectionProperty.PARSER_FACTORY,
- BeamSqlParserImpl.class.getName() + "#FACTORY");
- setDefault(info2, CalciteConnectionProperty.TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
- setDefault(info2, CalciteConnectionProperty.SCHEMA, "beam");
- setDefault(
- info2, CalciteConnectionProperty.SCHEMA_FACTORY, BeamCalciteSchemaFactory.class.getName());
-
- CalciteConnection connection = (CalciteConnection) super.connect(url, info2);
- final SchemaPlus defaultSchema;
- if (beamCalciteSchema != null) {
- defaultSchema =
- connection.getRootSchema().add(connection.config().schema(), beamCalciteSchema);
- connection.setSchema(defaultSchema.getName());
- } else {
- defaultSchema = getDefaultSchema(connection);
- }
+ Properties connectionProps = ensureDefaultProperties(originalConnectionProperties);
+ CalciteConnection calciteConnection = (CalciteConnection) super.connect(url, connectionProps);
- // Beam schema may change without notifying Calcite
- defaultSchema.setCacheEnabled(false);
-
- // Set default PipelineOptions to which we apply the querystring
- Map<String, String> pipelineOptionsMap =
- ((BeamCalciteSchema) CalciteSchema.from(defaultSchema).schema).getPipelineOptions();
- ReleaseInfo releaseInfo = ReleaseInfo.getReleaseInfo();
- pipelineOptionsMap.put("userAgent", String.format("BeamSQL/%s", releaseInfo.getVersion()));
-
- String querystring = url.substring(CONNECT_STRING_PREFIX.length());
- for (Map.Entry<Object, Object> propertyValue :
- ConnectStringParser.parse(querystring).entrySet()) {
- String name = (String) propertyValue.getKey();
- if (name.startsWith(BEAM_QUERYSTRING_PREFIX)) {
- pipelineOptionsMap.put(
- name.substring(BEAM_QUERYSTRING_PREFIX.length()), (String) propertyValue.getValue());
- }
- }
- return connection;
+ // calciteConnection is initialized with an empty Beam schema,
+ // we need to populate it with pipeline options, load table providers, etc
+ return JdbcConnection.initialize(calciteConnection);
+ }
+
+ /**
+ * Make sure required default properties are set.
+ *
+ * <p>Among other things sets up the parser class name, rel data type system and default schema
+ * factory.
+ *
+ * <p>The specified Beam schema factory will be used by Calcite to create the initial top level
+ * Beam schema. It can be later overridden by setting the schema via {@link
+ * JdbcConnection#setSchema(String, TableProvider)}.
+ */
+ private Properties ensureDefaultProperties(Properties originalInfo) {
+ Properties info = new Properties();
+ info.putAll(originalInfo);
+
+ setIfNull(info, TIME_ZONE, "UTC");
+ setIfNull(info, LEX, Lex.JAVA.name());
+ setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY");
+ setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
+ setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA);
+ setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName());
+
+ info.put("beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion());
+
+ return info;
}
- private static void setDefault(Properties info, ConnectionProperty key, String value) {
+ private static void setIfNull(Properties info, ConnectionProperty key, String value) {
// A null value indicates the default. We want to override defaults only.
if (info.getProperty(key.camelName()) == null) {
info.setProperty(key.camelName(), value);
}
}
- @VisibleForTesting
- public static CalciteConnection connect(TableProvider tableProvider) {
- try {
- Properties info = new Properties();
- info.put(BEAM_CALCITE_SCHEMA, new BeamCalciteSchema(tableProvider));
- return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static SchemaPlus getDefaultSchema(CalciteConnection connection) {
+ /**
+ * Connects to the driver using standard {@link #connect(String, Properties)} call, but overrides
+ * the initial schema factory. Default factory would load up all table providers. The one
+ * specified here doesn't load any providers. We then override the top-level schema with the
+ * {@code tableProvider}.
+ *
+ * <p>This is called in tests and {@link BeamSqlEnv}, core part of {@link SqlTransform}. CLI uses
+ * standard JDBC driver registry, and goes through {@link #connect(String, Properties)} instead,
+ * not this path. The CLI ends up using the schema factory that populates the default schema with
+ * all table providers it can find. See {@link BeamCalciteSchemaFactory}.
+ */
+ public static JdbcConnection connect(TableProvider tableProvider) {
try {
- String defaultSchemaName = connection.getSchema();
- return connection.getRootSchema().getSubSchema(defaultSchemaName);
+ Properties properties = new Properties();
+ setIfNull(properties, SCHEMA_FACTORY, BeamCalciteSchemaFactory.Empty.class.getName());
+ JdbcConnection connection =
+ (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
+ connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
+ return connection;
} catch (SQLException e) {
throw new RuntimeException(e);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
index a0c05d6841f2..7314305f4a3f 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlSetOptionBeam.java
@@ -19,7 +19,6 @@
import static org.apache.calcite.util.Static.RESOURCE;
-import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.calcite.jdbc.CalcitePrepare;
import org.apache.calcite.jdbc.CalciteSchema;
@@ -48,21 +47,15 @@ public void execute(CalcitePrepare.Context context) {
name.getParserPosition(),
RESOURCE.internal("Schema is not instanceof BeamCalciteSchema"));
}
+
BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
- Map<String, String> options = schema.getPipelineOptions();
- if (options == null) {
- throw SqlUtil.newContextException(
- name.getParserPosition(),
- RESOURCE.internal("PipelineOptions not accessible via BeamCalciteSchema"));
- }
- if (value == null) {
- if ("ALL".equals(pair.right)) {
- options.clear();
- } else {
- options.remove(pair.right);
- }
+
+ if (value != null) {
+ schema.setPipelineOption(pair.right, SqlDdlNodes.getString(value));
+ } else if ("ALL".equals(pair.right)) {
+ schema.removeAllPipelineOptions();
} else {
- options.put(pair.right, SqlDdlNodes.getString(value));
+ schema.removePipelineOption(pair.right);
}
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
index cfbcb7c00378..e1a47d13c55e 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java
@@ -19,6 +19,7 @@
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.meta.Table;
/**
@@ -26,6 +27,10 @@
*
* <p>So there will be a provider to handle textfile(CSV) based tables, there is a provider to
* handle MySQL based tables, a provider to handle Casandra based tables etc.
+ *
+ * <p><i>Note:</i> all implementations marked with {@code @AutoService(TableProvider.class)} are
+ * automatically loaded by CLI or other cases when {@link JdbcDriver} is used with default
+ * connection parameters.
*/
public interface TableProvider {
/** Gets the table type this provider handles. */
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index bc6d93d93087..062700ba0fe6 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -30,7 +30,6 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -39,8 +38,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -339,24 +338,20 @@ private CalciteConnection connect(PipelineOptions options, TableProvider... tabl
throws SQLException {
// HACK: PipelineOptions should expose a prominent method to do this reliably
// The actual options are in the "options" field of the converted map
- Map<String, Object> optionsMap =
- (Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options");
Map<String, String> argsMap =
- optionsMap
+ ((Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options"))
.entrySet()
.stream()
- .collect(Collectors.toMap(entry -> entry.getKey(), entry -> toArg(entry.getValue())));
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
for (TableProvider tableProvider : tableProviders) {
inMemoryMetaStore.registerProvider(tableProvider);
}
- Properties info = new Properties();
- BeamCalciteSchema dbSchema = new BeamCalciteSchema(inMemoryMetaStore);
- dbSchema.getPipelineOptions().putAll(argsMap);
- info.put(BEAM_CALCITE_SCHEMA, dbSchema);
- return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
+ JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore);
+ connection.setPipelineOptionsMap(argsMap);
+ return connection;
}
private static Boolean containsAll(Set<PubsubMessage> set, PubsubMessage... subsetCandidate) {
With regards,
Apache Git Services