You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/01/18 18:41:54 UTC
[beam] branch master updated: [SQL] Force cacheless root schema in
Jdbc connection
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 20d6093 [SQL] Force cacheless root schema in Jdbc connection
new 6a59667 Merge pull request #7553 from akedin/cacheless-root-schema
20d6093 is described below
commit 20d6093e4a4f864091d821d5320a5468a7547b4f
Author: akedin <ke...@google.com>
AuthorDate: Thu Jan 17 16:04:34 2019 -0800
[SQL] Force cacheless root schema in Jdbc connection
---
.../extensions/sql/impl/CalciteFactoryWrapper.java | 110 ++++++++++++++++++++
.../sdk/extensions/sql/impl/JdbcConnection.java | 26 ++---
.../beam/sdk/extensions/sql/impl/JdbcDriver.java | 68 ++----------
.../beam/sdk/extensions/sql/impl/JdbcFactory.java | 115 +++++++++++++++++++++
.../sdk/extensions/sql/impl/JdbcDriverTest.java | 12 +++
5 files changed, 262 insertions(+), 69 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java
new file mode 100644
index 0000000..a039154
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.TimeZone;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
+import org.apache.calcite.avatica.AvaticaResultSet;
+import org.apache.calcite.avatica.AvaticaSpecificDatabaseMetaData;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.UnregisteredDriver;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+
+/**
+ * Wrapper for {@link CalciteFactory}.
+ *
+ * <p>This is a non-functional class to delegate to the underlying {@link CalciteFactory}. The
+ * purpose is to hide the delegation logic from the implementation ({@link JdbcFactory}).
+ */
+public abstract class CalciteFactoryWrapper extends CalciteFactory {
+
+ protected CalciteFactory factory;
+
+ CalciteFactoryWrapper(CalciteFactory factory) {
+ super(4, 1);
+ this.factory = factory;
+ }
+
+ @Override
+ public AvaticaConnection newConnection(
+ UnregisteredDriver driver,
+ AvaticaFactory avaticaFactory,
+ String url,
+ Properties info,
+ CalciteSchema rootSchema,
+ JavaTypeFactory typeFactory) {
+
+ return this.factory.newConnection(driver, avaticaFactory, url, info, rootSchema, typeFactory);
+ }
+
+ @Override
+ public AvaticaStatement newStatement(
+ AvaticaConnection connection,
+ Meta.StatementHandle h,
+ int resultSetType,
+ int resultSetConcurrency,
+ int resultSetHoldability)
+ throws SQLException {
+ return this.factory.newStatement(
+ connection, h, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public AvaticaPreparedStatement newPreparedStatement(
+ AvaticaConnection connection,
+ Meta.StatementHandle h,
+ Meta.Signature signature,
+ int resultSetType,
+ int resultSetConcurrency,
+ int resultSetHoldability)
+ throws SQLException {
+ return this.factory.newPreparedStatement(
+ connection, h, signature, resultSetType, resultSetConcurrency, resultSetHoldability);
+ }
+
+ @Override
+ public AvaticaResultSet newResultSet(
+ AvaticaStatement statement,
+ QueryState state,
+ Meta.Signature signature,
+ TimeZone timeZone,
+ Meta.Frame firstFrame)
+ throws SQLException {
+ return this.factory.newResultSet(statement, state, signature, timeZone, firstFrame);
+ }
+
+ @Override
+ public AvaticaSpecificDatabaseMetaData newDatabaseMetaData(AvaticaConnection connection) {
+ return this.factory.newDatabaseMetaData(connection);
+ }
+
+ @Override
+ public ResultSetMetaData newResultSetMetaData(
+ AvaticaStatement statement, Meta.Signature signature) throws SQLException {
+ return this.factory.newResultSetMetaData(statement, signature);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
index 318d90d..e3c2aa6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -57,17 +57,20 @@ public class JdbcConnection extends CalciteConnectionWrapper {
* <p>Sets the pipeline options, replaces the initial non-functional top-level schema with schema
* created by {@link BeamCalciteSchemaFactory}.
*/
- static @Nullable JdbcConnection initialize(CalciteConnection connection) throws SQLException {
- if (connection == null) {
- return null;
- }
+ static @Nullable JdbcConnection initialize(CalciteConnection connection) {
+ try {
+ if (connection == null) {
+ return null;
+ }
- JdbcConnection jdbcConnection = new JdbcConnection(connection);
- jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
- jdbcConnection.getRootSchema().setCacheEnabled(false);
- jdbcConnection.setSchema(
- connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
- return jdbcConnection;
+ JdbcConnection jdbcConnection = new JdbcConnection(connection);
+ jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
+ jdbcConnection.setSchema(
+ connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
+ return jdbcConnection;
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
}
/**
@@ -120,7 +123,6 @@ public class JdbcConnection extends CalciteConnectionWrapper {
*/
void setSchema(String name, TableProvider tableProvider) {
BeamCalciteSchema beamCalciteSchema = new BeamCalciteSchema(this, tableProvider);
- SchemaPlus addedSchemaPlus = getRootSchema().add(name, beamCalciteSchema);
- addedSchemaPlus.setCacheEnabled(false);
+ getRootSchema().add(name, beamCalciteSchema);
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index 0cf8423..bb7cc42 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -17,12 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql.impl;
-import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE;
-import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
-import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY;
-import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA;
import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
-import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM;
import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
import com.google.auto.service.AutoService;
@@ -33,15 +28,12 @@ import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.calcite.avatica.ConnectionProperty;
-import org.apache.calcite.config.Lex;
+import org.apache.calcite.avatica.AvaticaFactory;
import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteFactory;
import org.apache.calcite.jdbc.Driver;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
@@ -109,6 +101,11 @@ public class JdbcDriver extends Driver {
}
@Override
+ protected AvaticaFactory createFactory() {
+ return JdbcFactory.wrap((CalciteFactory) super.createFactory());
+ }
+
+ @Override
protected String getConnectStringPrefix() {
return CONNECT_STRING_PREFIX;
}
@@ -127,54 +124,10 @@ public class JdbcDriver extends Driver {
* CalciteConnection}.
*/
@Override
- public Connection connect(String url, Properties originalConnectionProperties)
- throws SQLException {
-
- // do this check before even looking into properties
- // do not remove this, please
- if (!acceptsURL(url)) {
- return null;
- }
-
- Properties connectionProps = ensureDefaultProperties(originalConnectionProperties);
- CalciteConnection calciteConnection = (CalciteConnection) super.connect(url, connectionProps);
-
+ public Connection connect(String url, Properties info) throws SQLException {
// calciteConnection is initialized with an empty Beam schema,
// we need to populate it with pipeline options, load table providers, etc
- return JdbcConnection.initialize(calciteConnection);
- }
-
- /**
- * Make sure required default properties are set.
- *
- * <p>Among other things sets up the parser class name, rel data type system and default schema
- * factory.
- *
- * <p>The specified Beam schema factory will be used by Calcite to create the initial top level
- * Beam schema. It can be later overridden by setting the schema via {@link
- * JdbcConnection#setSchema(String, TableProvider)}.
- */
- private Properties ensureDefaultProperties(Properties originalInfo) {
- Properties info = new Properties();
- info.putAll(originalInfo);
-
- setIfNull(info, TIME_ZONE, "UTC");
- setIfNull(info, LEX, Lex.JAVA.name());
- setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY");
- setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
- setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA);
- setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName());
-
- info.put("beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion());
-
- return info;
- }
-
- private static void setIfNull(Properties info, ConnectionProperty key, String value) {
- // A null value indicates the default. We want to override defaults only.
- if (info.getProperty(key.camelName()) == null) {
- info.setProperty(key.camelName(), value);
- }
+ return JdbcConnection.initialize((CalciteConnection) super.connect(url, info));
}
/**
@@ -191,7 +144,8 @@ public class JdbcDriver extends Driver {
public static JdbcConnection connect(TableProvider tableProvider) {
try {
Properties properties = new Properties();
- setIfNull(properties, SCHEMA_FACTORY, BeamCalciteSchemaFactory.Empty.class.getName());
+ properties.setProperty(
+ SCHEMA_FACTORY.camelName(), BeamCalciteSchemaFactory.Empty.class.getName());
JdbcConnection connection =
(JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java
new file mode 100644
index 0000000..70c1229
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.extensions.sql.impl.JdbcDriver.TOP_LEVEL_BEAM_SCHEMA;
+import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE;
+import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
+import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM;
+
+import java.util.Properties;
+import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.ConnectionProperty;
+import org.apache.calcite.avatica.UnregisteredDriver;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+
+/**
+ * Implements {@link CalciteFactory} that is used by Clacite JDBC driver to instantiate different
+ * JDBC objects, like connections, result sets, etc.
+ *
+ * <p>The purpose of this class is to intercept the connection creation and force a cache-less root
+ * schema ({@link org.apache.calcite.jdbc.SimpleCalciteSchema}). Otherwise Calcite uses {@link
+ * org.apache.calcite.jdbc.CachingCalciteSchema} that eagerly caches table information. This
+ * behavior does not work well for dynamic table providers.
+ */
+class JdbcFactory extends CalciteFactoryWrapper {
+
+ JdbcFactory(CalciteFactory factory) {
+ super(factory);
+ }
+
+ static JdbcFactory wrap(CalciteFactory calciteFactory) {
+ return new JdbcFactory(calciteFactory);
+ }
+
+ @Override
+ public AvaticaConnection newConnection(
+ UnregisteredDriver driver,
+ AvaticaFactory avaticaFactory,
+ String url,
+ Properties info,
+ CalciteSchema rootSchema,
+ JavaTypeFactory typeFactory) {
+
+ Properties connectionProps = ensureDefaultProperties(info);
+ CalciteSchema actualRootSchema = rootSchema;
+ if (rootSchema == null) {
+ actualRootSchema = CalciteSchema.createRootSchema(true, false, "");
+ }
+
+ return super.newConnection(
+ driver, avaticaFactory, url, connectionProps, actualRootSchema, typeFactory);
+ }
+
+ /**
+ * Make sure required default properties are set.
+ *
+ * <p>Among other things sets up the parser class name, rel data type system and default schema
+ * factory.
+ *
+ * <p>The specified Beam schema factory will be used by Calcite to create the initial top level
+ * Beam schema. It can be later overridden by setting the schema via {@link
+ * JdbcConnection#setSchema(String, TableProvider)}.
+ */
+ private Properties ensureDefaultProperties(Properties originalInfo) {
+ Properties info = new Properties();
+ info.putAll(originalInfo);
+
+ setIfNull(info, TIME_ZONE, "UTC");
+ setIfNull(info, LEX, Lex.JAVA.name());
+ setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY");
+ setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
+ setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA);
+ setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName());
+ setIfNull(info, "beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion());
+
+ return info;
+ }
+
+ private static void setIfNull(Properties info, ConnectionProperty key, String value) {
+ setIfNull(info, key.camelName(), value);
+ }
+
+ private static void setIfNull(Properties info, String key, String value) {
+ // A null value indicates the default. We want to override defaults only.
+ if (info.getProperty(key) == null) {
+ info.setProperty(key, value);
+ }
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index a7525d2..6f36173 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.calcite.jdbc.CalciteConnection;
@@ -120,6 +121,17 @@ public class JdbcDriverTest {
assertThat(pipelineOptions.get("userAgent"), containsString("BeamSQL"));
}
+ /** Tests that userAgent is set. */
+ @Test
+ public void testDriverManager_hasUserAgent() throws Exception {
+ JdbcConnection connection =
+ (JdbcConnection) DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
+ BeamCalciteSchema schema = connection.getCurrentBeamSchema();
+ assertThat(
+ schema.getPipelineOptions().get("userAgent"),
+ equalTo("BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion()));
+ }
+
/** Tests that userAgent can be overridden on the querystring. */
@Test
public void testDriverManager_setUserAgent() throws Exception {