You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/01/18 18:41:54 UTC

[beam] branch master updated: [SQL] Force cacheless root schema in Jdbc connection

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 20d6093  [SQL] Force cacheless root schema in Jdbc connection
     new 6a59667  Merge pull request #7553 from akedin/cacheless-root-schema
20d6093 is described below

commit 20d6093e4a4f864091d821d5320a5468a7547b4f
Author: akedin <ke...@google.com>
AuthorDate: Thu Jan 17 16:04:34 2019 -0800

    [SQL] Force cacheless root schema in Jdbc connection
---
 .../extensions/sql/impl/CalciteFactoryWrapper.java | 110 ++++++++++++++++++++
 .../sdk/extensions/sql/impl/JdbcConnection.java    |  26 ++---
 .../beam/sdk/extensions/sql/impl/JdbcDriver.java   |  68 ++----------
 .../beam/sdk/extensions/sql/impl/JdbcFactory.java  | 115 +++++++++++++++++++++
 .../sdk/extensions/sql/impl/JdbcDriverTest.java    |  12 +++
 5 files changed, 262 insertions(+), 69 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java
new file mode 100644
index 0000000..a039154
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CalciteFactoryWrapper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.TimeZone;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
+import org.apache.calcite.avatica.AvaticaResultSet;
+import org.apache.calcite.avatica.AvaticaSpecificDatabaseMetaData;
+import org.apache.calcite.avatica.AvaticaStatement;
+import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.QueryState;
+import org.apache.calcite.avatica.UnregisteredDriver;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+
+/**
+ * Wrapper for {@link CalciteFactory}.
+ *
+ * <p>This is a non-functional class to delegate to the underlying {@link CalciteFactory}. The
+ * purpose is to hide the delegation logic from the implementation ({@link JdbcFactory}).
+ */
+public abstract class CalciteFactoryWrapper extends CalciteFactory {
+
+  protected CalciteFactory factory;
+
+  CalciteFactoryWrapper(CalciteFactory factory) {
+    super(4, 1);
+    this.factory = factory;
+  }
+
+  @Override
+  public AvaticaConnection newConnection(
+      UnregisteredDriver driver,
+      AvaticaFactory avaticaFactory,
+      String url,
+      Properties info,
+      CalciteSchema rootSchema,
+      JavaTypeFactory typeFactory) {
+
+    return this.factory.newConnection(driver, avaticaFactory, url, info, rootSchema, typeFactory);
+  }
+
+  @Override
+  public AvaticaStatement newStatement(
+      AvaticaConnection connection,
+      Meta.StatementHandle h,
+      int resultSetType,
+      int resultSetConcurrency,
+      int resultSetHoldability)
+      throws SQLException {
+    return this.factory.newStatement(
+        connection, h, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public AvaticaPreparedStatement newPreparedStatement(
+      AvaticaConnection connection,
+      Meta.StatementHandle h,
+      Meta.Signature signature,
+      int resultSetType,
+      int resultSetConcurrency,
+      int resultSetHoldability)
+      throws SQLException {
+    return this.factory.newPreparedStatement(
+        connection, h, signature, resultSetType, resultSetConcurrency, resultSetHoldability);
+  }
+
+  @Override
+  public AvaticaResultSet newResultSet(
+      AvaticaStatement statement,
+      QueryState state,
+      Meta.Signature signature,
+      TimeZone timeZone,
+      Meta.Frame firstFrame)
+      throws SQLException {
+    return this.factory.newResultSet(statement, state, signature, timeZone, firstFrame);
+  }
+
+  @Override
+  public AvaticaSpecificDatabaseMetaData newDatabaseMetaData(AvaticaConnection connection) {
+    return this.factory.newDatabaseMetaData(connection);
+  }
+
+  @Override
+  public ResultSetMetaData newResultSetMetaData(
+      AvaticaStatement statement, Meta.Signature signature) throws SQLException {
+    return this.factory.newResultSetMetaData(statement, signature);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
index 318d90d..e3c2aa6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcConnection.java
@@ -57,17 +57,20 @@ public class JdbcConnection extends CalciteConnectionWrapper {
    * <p>Sets the pipeline options, replaces the initial non-functional top-level schema with schema
    * created by {@link BeamCalciteSchemaFactory}.
    */
-  static @Nullable JdbcConnection initialize(CalciteConnection connection) throws SQLException {
-    if (connection == null) {
-      return null;
-    }
+  static @Nullable JdbcConnection initialize(CalciteConnection connection) {
+    try {
+      if (connection == null) {
+        return null;
+      }
 
-    JdbcConnection jdbcConnection = new JdbcConnection(connection);
-    jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
-    jdbcConnection.getRootSchema().setCacheEnabled(false);
-    jdbcConnection.setSchema(
-        connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
-    return jdbcConnection;
+      JdbcConnection jdbcConnection = new JdbcConnection(connection);
+      jdbcConnection.setPipelineOptionsMap(extractPipelineOptions(connection));
+      jdbcConnection.setSchema(
+          connection.getSchema(), BeamCalciteSchemaFactory.fromInitialEmptySchema(jdbcConnection));
+      return jdbcConnection;
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   /**
@@ -120,7 +123,6 @@ public class JdbcConnection extends CalciteConnectionWrapper {
    */
   void setSchema(String name, TableProvider tableProvider) {
     BeamCalciteSchema beamCalciteSchema = new BeamCalciteSchema(this, tableProvider);
-    SchemaPlus addedSchemaPlus = getRootSchema().add(name, beamCalciteSchema);
-    addedSchemaPlus.setCacheEnabled(false);
+    getRootSchema().add(name, beamCalciteSchema);
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index 0cf8423..bb7cc42 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -17,12 +17,7 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl;
 
-import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE;
-import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
-import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY;
-import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA;
 import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
-import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM;
 import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
 
 import com.google.auto.service.AutoService;
@@ -33,15 +28,12 @@ import java.util.List;
 import java.util.Properties;
 import java.util.function.Consumer;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
-import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRuleSets;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.ReleaseInfo;
-import org.apache.calcite.avatica.ConnectionProperty;
-import org.apache.calcite.config.Lex;
+import org.apache.calcite.avatica.AvaticaFactory;
 import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.jdbc.CalciteFactory;
 import org.apache.calcite.jdbc.Driver;
 import org.apache.calcite.plan.RelOptPlanner;
 import org.apache.calcite.plan.RelOptRule;
@@ -109,6 +101,11 @@ public class JdbcDriver extends Driver {
   }
 
   @Override
+  protected AvaticaFactory createFactory() {
+    return JdbcFactory.wrap((CalciteFactory) super.createFactory());
+  }
+
+  @Override
   protected String getConnectStringPrefix() {
     return CONNECT_STRING_PREFIX;
   }
@@ -127,54 +124,10 @@ public class JdbcDriver extends Driver {
    * CalciteConnection}.
    */
   @Override
-  public Connection connect(String url, Properties originalConnectionProperties)
-      throws SQLException {
-
-    // do this check before even looking into properties
-    // do not remove this, please
-    if (!acceptsURL(url)) {
-      return null;
-    }
-
-    Properties connectionProps = ensureDefaultProperties(originalConnectionProperties);
-    CalciteConnection calciteConnection = (CalciteConnection) super.connect(url, connectionProps);
-
+  public Connection connect(String url, Properties info) throws SQLException {
     // calciteConnection is initialized with an empty Beam schema,
     // we need to populate it with pipeline options, load table providers, etc
-    return JdbcConnection.initialize(calciteConnection);
-  }
-
-  /**
-   * Make sure required default properties are set.
-   *
-   * <p>Among other things sets up the parser class name, rel data type system and default schema
-   * factory.
-   *
-   * <p>The specified Beam schema factory will be used by Calcite to create the initial top level
-   * Beam schema. It can be later overridden by setting the schema via {@link
-   * JdbcConnection#setSchema(String, TableProvider)}.
-   */
-  private Properties ensureDefaultProperties(Properties originalInfo) {
-    Properties info = new Properties();
-    info.putAll(originalInfo);
-
-    setIfNull(info, TIME_ZONE, "UTC");
-    setIfNull(info, LEX, Lex.JAVA.name());
-    setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY");
-    setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
-    setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA);
-    setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName());
-
-    info.put("beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion());
-
-    return info;
-  }
-
-  private static void setIfNull(Properties info, ConnectionProperty key, String value) {
-    // A null value indicates the default. We want to override defaults only.
-    if (info.getProperty(key.camelName()) == null) {
-      info.setProperty(key.camelName(), value);
-    }
+    return JdbcConnection.initialize((CalciteConnection) super.connect(url, info));
   }
 
   /**
@@ -191,7 +144,8 @@ public class JdbcDriver extends Driver {
   public static JdbcConnection connect(TableProvider tableProvider) {
     try {
       Properties properties = new Properties();
-      setIfNull(properties, SCHEMA_FACTORY, BeamCalciteSchemaFactory.Empty.class.getName());
+      properties.setProperty(
+          SCHEMA_FACTORY.camelName(), BeamCalciteSchemaFactory.Empty.class.getName());
       JdbcConnection connection =
           (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
       connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java
new file mode 100644
index 0000000..70c1229
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl;
+
+import static org.apache.beam.sdk.extensions.sql.impl.JdbcDriver.TOP_LEVEL_BEAM_SCHEMA;
+import static org.apache.calcite.avatica.BuiltInConnectionProperty.TIME_ZONE;
+import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
+import static org.apache.calcite.config.CalciteConnectionProperty.PARSER_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA;
+import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
+import static org.apache.calcite.config.CalciteConnectionProperty.TYPE_SYSTEM;
+
+import java.util.Properties;
+import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamRelDataTypeSystem;
+import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.util.ReleaseInfo;
+import org.apache.calcite.adapter.java.JavaTypeFactory;
+import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaFactory;
+import org.apache.calcite.avatica.ConnectionProperty;
+import org.apache.calcite.avatica.UnregisteredDriver;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteFactory;
+import org.apache.calcite.jdbc.CalciteSchema;
+
+/**
+ * Implements {@link CalciteFactory} that is used by Clacite JDBC driver to instantiate different
+ * JDBC objects, like connections, result sets, etc.
+ *
+ * <p>The purpose of this class is to intercept the connection creation and force a cache-less root
+ * schema ({@link org.apache.calcite.jdbc.SimpleCalciteSchema}). Otherwise Calcite uses {@link
+ * org.apache.calcite.jdbc.CachingCalciteSchema} that eagerly caches table information. This
+ * behavior does not work well for dynamic table providers.
+ */
+class JdbcFactory extends CalciteFactoryWrapper {
+
+  JdbcFactory(CalciteFactory factory) {
+    super(factory);
+  }
+
+  static JdbcFactory wrap(CalciteFactory calciteFactory) {
+    return new JdbcFactory(calciteFactory);
+  }
+
+  @Override
+  public AvaticaConnection newConnection(
+      UnregisteredDriver driver,
+      AvaticaFactory avaticaFactory,
+      String url,
+      Properties info,
+      CalciteSchema rootSchema,
+      JavaTypeFactory typeFactory) {
+
+    Properties connectionProps = ensureDefaultProperties(info);
+    CalciteSchema actualRootSchema = rootSchema;
+    if (rootSchema == null) {
+      actualRootSchema = CalciteSchema.createRootSchema(true, false, "");
+    }
+
+    return super.newConnection(
+        driver, avaticaFactory, url, connectionProps, actualRootSchema, typeFactory);
+  }
+
+  /**
+   * Make sure required default properties are set.
+   *
+   * <p>Among other things sets up the parser class name, rel data type system and default schema
+   * factory.
+   *
+   * <p>The specified Beam schema factory will be used by Calcite to create the initial top level
+   * Beam schema. It can be later overridden by setting the schema via {@link
+   * JdbcConnection#setSchema(String, TableProvider)}.
+   */
+  private Properties ensureDefaultProperties(Properties originalInfo) {
+    Properties info = new Properties();
+    info.putAll(originalInfo);
+
+    setIfNull(info, TIME_ZONE, "UTC");
+    setIfNull(info, LEX, Lex.JAVA.name());
+    setIfNull(info, PARSER_FACTORY, BeamSqlParserImpl.class.getName() + "#FACTORY");
+    setIfNull(info, TYPE_SYSTEM, BeamRelDataTypeSystem.class.getName());
+    setIfNull(info, SCHEMA, TOP_LEVEL_BEAM_SCHEMA);
+    setIfNull(info, SCHEMA_FACTORY, BeamCalciteSchemaFactory.AllProviders.class.getName());
+    setIfNull(info, "beam.userAgent", "BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion());
+
+    return info;
+  }
+
+  private static void setIfNull(Properties info, ConnectionProperty key, String value) {
+    setIfNull(info, key.camelName(), value);
+  }
+
+  private static void setIfNull(Properties info, String key, String value) {
+    // A null value indicates the default. We want to override defaults only.
+    if (info.getProperty(key) == null) {
+      info.setProperty(key, value);
+    }
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index a7525d2..6f36173 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.apache.calcite.jdbc.CalciteConnection;
@@ -120,6 +121,17 @@ public class JdbcDriverTest {
     assertThat(pipelineOptions.get("userAgent"), containsString("BeamSQL"));
   }
 
+  /** Tests that userAgent is set. */
+  @Test
+  public void testDriverManager_hasUserAgent() throws Exception {
+    JdbcConnection connection =
+        (JdbcConnection) DriverManager.getConnection(JdbcDriver.CONNECT_STRING_PREFIX);
+    BeamCalciteSchema schema = connection.getCurrentBeamSchema();
+    assertThat(
+        schema.getPipelineOptions().get("userAgent"),
+        equalTo("BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion()));
+  }
+
   /** Tests that userAgent can be overridden on the querystring. */
   @Test
   public void testDriverManager_setUserAgent() throws Exception {