You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/06/17 23:48:50 UTC

[beam] 01/01: Revert "[BEAM-7513] Adding Row Count for Bigquery Table"

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch revert-8822-bigquery-rowcount
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7b7c21544b50b1c8ac96f6c88362a0e266b6d55e
Author: Anton Kedin <33...@users.noreply.github.com>
AuthorDate: Mon Jun 17 16:48:30 2019 -0700

    Revert "[BEAM-7513] Adding Row Count for Bigquery Table"
---
 .../apache/beam/sdk/extensions/sql/BeamSqlCli.java |   7 +-
 .../beam/sdk/extensions/sql/BeamSqlTable.java      |   7 -
 .../beam/sdk/extensions/sql/SqlTransform.java      |   2 -
 .../sql/impl/BeamCalciteSchemaFactory.java         |  12 +-
 .../sdk/extensions/sql/impl/BeamCalciteTable.java  |  23 ---
 .../sql/impl/BeamRowCountStatistics.java           |  44 ------
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   |  26 +---
 .../beam/sdk/extensions/sql/impl/JdbcDriver.java   |  34 +----
 .../sql/meta/provider/bigquery/BigQueryTable.java  |  39 -----
 .../sdk/extensions/sql/impl/BeamSqlEnvTest.java    |   7 +-
 .../sdk/extensions/sql/impl/JdbcDriverTest.java    |  31 ++--
 .../extensions/sql/impl/parser/BeamDDLTest.java    |   7 +-
 ...BeamSqlBuiltinFunctionsIntegrationTestBase.java |   7 +-
 .../meta/provider/bigquery/BigQueryRowCountIT.java | 161 ---------------------
 .../meta/provider/bigquery/BigQueryTestTable.java  |  45 ------
 .../bigquery/BigQueryTestTableProvider.java        |  71 ---------
 .../sql/meta/provider/pubsub/PubsubJsonIT.java     |   2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  |  23 ---
 18 files changed, 29 insertions(+), 519 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 8bdb1bf..5e44c6c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */
 @Experimental
@@ -35,17 +34,15 @@ public class BeamSqlCli {
   private MetaStore metaStore;
 
   public BeamSqlCli metaStore(MetaStore metaStore) {
-    return metaStore(metaStore, false, PipelineOptionsFactory.create());
+    return metaStore(metaStore, false);
   }
 
-  public BeamSqlCli metaStore(
-      MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) {
+  public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
     this.metaStore = metaStore;
     BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
     if (autoLoadUdfUdaf) {
       builder.autoLoadUserDefinedFunctions();
     }
-    builder.setPipelineOptions(pipelineOptions);
     this.env = builder.build();
     return this;
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 63f7158..14f1b80 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -38,9 +36,4 @@ public interface BeamSqlTable {
 
   /** Get the schema info of the table. */
   Schema getSchema();
-
-  /** Estimates the number of rows or returns null if there is no estimation. */
-  default BeamRowCountStatistics getRowCount(PipelineOptions options) {
-    return BeamRowCountStatistics.UNKNOWN;
-  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index afa4438..e45daca 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -118,8 +118,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
     sqlEnvBuilder.setQueryPlannerClassName(
         input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName());
 
-    sqlEnvBuilder.setPipelineOptions(input.getPipeline().getOptions());
-
     BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
     return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString()));
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
index 339810c..f6a016b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -51,8 +51,8 @@ import org.apache.calcite.schema.Table;
  * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection
  * properties).
  *
- * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider,
- * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers.
+ * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid
+ * loading all available table providers.
  */
 class BeamCalciteSchemaFactory {
 
@@ -97,10 +97,10 @@ class BeamCalciteSchemaFactory {
   }
 
   /**
-   * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider
-   * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This
-   * schema is expected to be replaced by an actual functional schema by the same code that
-   * specified this override in the first place.
+   * This is the override to create an empty schema, used in {@link
+   * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is
+   * expected to be replaced by an actual functional schema by the same code that specified this
+   * override in the first place.
    */
   public static class Empty extends InitialEmptySchema implements SchemaFactory {
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index e53cb04..e800c82 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -21,11 +21,9 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.apache.calcite.adapter.java.AbstractQueryableTable;
 import org.apache.calcite.linq4j.QueryProvider;
@@ -40,8 +38,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.TranslatableTable;
 
 /** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -66,25 +62,6 @@ public class BeamCalciteTable extends AbstractQueryableTable
   }
 
   @Override
-  public Statistic getStatistic() {
-    /*
-     Changing class loader is required for the JDBC path. It is similar to what done in
-     {@link BeamEnumerableConverter#toRowList} and {@link BeamEnumerableConverter#toEnumerable }.
-    */
-    final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
-    try {
-      Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
-      BeamRowCountStatistics beamStatistics =
-          beamTable.getRowCount(BeamEnumerableConverter.createPipelineOptions(pipelineOptions));
-      return beamStatistics.isUnknown()
-          ? Statistics.UNKNOWN
-          : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of());
-    } finally {
-      Thread.currentThread().setContextClassLoader(originalClassLoader);
-    }
-  }
-
-  @Override
   public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
     return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptions);
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
deleted file mode 100644
index ac0431d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-
-/** This class stores row count statistics. */
-public class BeamRowCountStatistics implements Serializable {
-  public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null);
-  private final BigInteger rowCount;
-
-  private BeamRowCountStatistics(BigInteger rowCount) {
-    this.rowCount = rowCount;
-  }
-
-  public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) {
-    return new BeamRowCountStatistics(rowCount);
-  }
-
-  /** Is true if the row count cannot be estimated. */
-  public boolean isUnknown() {
-    return rowCount == null;
-  }
-
-  public BigInteger getRowCount() {
-    return rowCount;
-  }
-}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index ae4238d..02b3e69 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -38,8 +38,6 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
@@ -68,26 +66,14 @@ public class BeamSqlEnv {
     return new BeamSqlEnvBuilder(tableProvider);
   }
 
-  /**
-   * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
-   * Pipeline Options. It should only be used in tests.
-   */
   public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
     return withTableProvider(new ReadOnlyTableProvider(tableType, tables));
   }
 
-  /**
-   * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
-   * Pipeline Options. It should only be used in tests.
-   */
   public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
-    return builder(tableProvider).setPipelineOptions(PipelineOptionsFactory.create()).build();
+    return builder(tableProvider).build();
   }
 
-  /**
-   * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty *
-   * Pipeline Options. It should only be used in tests.
-   */
   public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
     InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
     for (TableProvider tableProvider : tableProviders) {
@@ -137,7 +123,6 @@ public class BeamSqlEnv {
     private Set<Map.Entry<String, Function>> functionSet;
     private boolean autoLoadBuiltinFunctions;
     private boolean autoLoadUdfs;
-    private PipelineOptions pipelineOptions;
 
     private BeamSqlEnvBuilder(TableProvider tableProvider) {
       checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
@@ -148,7 +133,6 @@ public class BeamSqlEnv {
       functionSet = new HashSet<>();
       autoLoadUdfs = false;
       autoLoadBuiltinFunctions = false;
-      pipelineOptions = null;
     }
 
     /** Add a top-level schema backed by the table provider. */
@@ -210,20 +194,14 @@ public class BeamSqlEnv {
       return this;
     }
 
-    public BeamSqlEnvBuilder setPipelineOptions(PipelineOptions pipelineOptions) {
-      this.pipelineOptions = pipelineOptions;
-      return this;
-    }
-
     /**
      * Build function to create an instance of BeamSqlEnv based on preset fields.
      *
      * @return BeamSqlEnv.
      */
     public BeamSqlEnv build() {
-      checkNotNull(pipelineOptions);
 
-      JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider, pipelineOptions);
+      JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);
 
       configureSchemas(jdbcConnection);
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index 4dfabb8..bb7cc42 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -20,14 +20,11 @@ package org.apache.beam.sdk.extensions.sql.impl;
 import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
 import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.service.AutoService;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.function.Consumer;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -103,8 +100,6 @@ public class JdbcDriver extends Driver {
     INSTANCE.register();
   }
 
-  public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
   @Override
   protected AvaticaFactory createFactory() {
     return JdbcFactory.wrap((CalciteFactory) super.createFactory());
@@ -146,7 +141,7 @@ public class JdbcDriver extends Driver {
    * not this path. The CLI ends up using the schema factory that populates the default schema with
    * all table providers it can find. See {@link BeamCalciteSchemaFactory}.
    */
-  public static JdbcConnection connect(TableProvider tableProvider, PipelineOptions options) {
+  public static JdbcConnection connect(TableProvider tableProvider) {
     try {
       Properties properties = new Properties();
       properties.setProperty(
@@ -154,36 +149,9 @@ public class JdbcDriver extends Driver {
       JdbcConnection connection =
           (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
       connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
-      connection.setPipelineOptionsMap(getOptionsMap(options));
       return connection;
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
-
-  /** Converts {@link PipelineOptions} to its map format. */
-  private static Map<String, String> getOptionsMap(PipelineOptions options) {
-    Map map = OBJECT_MAPPER.convertValue(options, Map.class);
-
-    map = (Map) map.get("options");
-    if (map == null) {
-      map = new HashMap();
-    }
-
-    Map<String, String> optionMap = new HashMap<>();
-    for (Object entry : map.entrySet()) {
-      Map.Entry ent = (Map.Entry) entry;
-      String value;
-      try {
-        value =
-            (ent.getValue() instanceof String)
-                ? ent.getValue().toString()
-                : OBJECT_MAPPER.writeValueAsString(ent.getValue());
-      } catch (Exception e) {
-        throw new IllegalArgumentException("Unable to parse Pipeline Options", e);
-      }
-      optionMap.put(ent.getKey().toString(), value);
-    }
-    return optionMap;
-  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 222e4cf..621f149 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -17,27 +17,19 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.math.BigInteger;
 import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently
@@ -47,8 +39,6 @@ import org.slf4j.LoggerFactory;
 class BigQueryTable extends BaseBeamTable implements Serializable {
   @VisibleForTesting final String bqLocation;
   private final ConversionOptions conversionOptions;
-  private BeamRowCountStatistics rowCountStatistics = null;
-  private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class);
 
   BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
     super(table.getSchema());
@@ -57,16 +47,6 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
   }
 
   @Override
-  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
-
-    if (rowCountStatistics == null) {
-      rowCountStatistics = getRowCountFromBQ(options, bqLocation);
-    }
-
-    return rowCountStatistics;
-  }
-
-  @Override
   public PCollection.IsBounded isBounded() {
     return PCollection.IsBounded.BOUNDED;
   }
@@ -92,23 +72,4 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
             .withFormatFunction(BigQueryUtils.toTableRow())
             .to(bqLocation));
   }
-
-  private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) {
-    try {
-      BigInteger rowCount =
-          BigQueryHelpers.getNumRows(
-              o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation));
-
-      if (rowCount == null) {
-        return BeamRowCountStatistics.UNKNOWN;
-      }
-
-      return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
-
-    } catch (IOException | InterruptedException e) {
-      LOGGER.warn("Could not get the row count for the table " + bqLocation, e);
-    }
-
-    return BeamRowCountStatistics.UNKNOWN;
-  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index 3b6dda0..517309d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -24,7 +24,6 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -43,7 +42,6 @@ public class BeamSqlEnvTest {
         BeamSqlEnv.builder(root)
             .addSchema("nested", nested)
             .addSchema("anotherOne", anotherOne)
-            .setPipelineOptions(PipelineOptionsFactory.create())
             .build();
 
     Connection connection = env.connection;
@@ -62,9 +60,6 @@ public class BeamSqlEnvTest {
     exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
 
     TestTableProvider root = new TestTableProvider();
-    BeamSqlEnv.builder(root)
-        .setQueryPlannerClassName("org.test.ClassNotFound")
-        .setPipelineOptions(PipelineOptionsFactory.create())
-        .build();
+    BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build();
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 3272d00..6f36173 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.Row;
@@ -198,7 +197,7 @@ public class JdbcDriverTest {
   @Test
   public void testSelectsFromExistingTable() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    Connection connection = JdbcDriver.connect(tableProvider);
 
     connection
         .createStatement()
@@ -220,7 +219,7 @@ public class JdbcDriverTest {
   @Test
   public void testTimestampWithDefaultTimezone() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    Connection connection = JdbcDriver.connect(tableProvider);
 
     // A table with one TIMESTAMP column
     Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -251,7 +250,7 @@ public class JdbcDriverTest {
   public void testTimestampWithNonzeroTimezone() throws Exception {
     Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT);
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    Connection connection = JdbcDriver.connect(tableProvider);
 
     // A table with one TIMESTAMP column
     Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -281,7 +280,7 @@ public class JdbcDriverTest {
   public void testTimestampWithZeroTimezone() throws Exception {
     Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    Connection connection = JdbcDriver.connect(tableProvider);
 
     // A table with one TIMESTAMP column
     Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -310,7 +309,7 @@ public class JdbcDriverTest {
   @Test
   public void testSelectsFromExistingComplexTable() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    Connection connection = JdbcDriver.connect(tableProvider);
 
     connection
         .createStatement()
@@ -344,7 +343,7 @@ public class JdbcDriverTest {
   @Test
   public void testInsertIntoCreatedTable() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    Connection connection = JdbcDriver.connect(tableProvider);
 
     connection
         .createStatement()
@@ -370,8 +369,7 @@ public class JdbcDriverTest {
 
   @Test
   public void testInternalConnect_boundedTable() throws Exception {
-    CalciteConnection connection =
-        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
     Statement statement = connection.createStatement();
     ResultSet resultSet = statement.executeQuery("SELECT * FROM test");
     assertTrue(resultSet.next());
@@ -394,8 +392,7 @@ public class JdbcDriverTest {
                     .addRows(1, "second first")
                     .addRows(2, "second")));
 
-    CalciteConnection connection =
-        JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    CalciteConnection connection = JdbcDriver.connect(tableProvider);
     Statement statement = connection.createStatement();
     ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 5");
     assertTrue(resultSet1.next());
@@ -435,8 +432,7 @@ public class JdbcDriverTest {
                     .timestampColumnIndex(3)
                     .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)));
 
-    CalciteConnection connection =
-        JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+    CalciteConnection connection = JdbcDriver.connect(tableProvider);
     Statement statement = connection.createStatement();
 
     ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 1");
@@ -474,8 +470,7 @@ public class JdbcDriverTest {
 
   @Test
   public void testInternalConnect_setDirectRunner() throws Exception {
-    CalciteConnection connection =
-        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
     Statement statement = connection.createStatement();
     assertEquals(0, statement.executeUpdate("SET runner = direct"));
     assertTrue(statement.execute("SELECT * FROM test"));
@@ -485,8 +480,7 @@ public class JdbcDriverTest {
   public void testInternalConnect_setBogusRunner() throws Exception {
     thrown.expectMessage("Unknown 'runner' specified 'bogus'");
 
-    CalciteConnection connection =
-        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
     Statement statement = connection.createStatement();
     assertEquals(0, statement.executeUpdate("SET runner = bogus"));
     assertTrue(statement.execute("SELECT * FROM test"));
@@ -494,8 +488,7 @@ public class JdbcDriverTest {
 
   @Test
   public void testInternalConnect_resetAll() throws Exception {
-    CalciteConnection connection =
-        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
     Statement statement = connection.createStatement();
     assertEquals(0, statement.executeUpdate("SET runner = bogus"));
     assertEquals(0, statement.executeUpdate("RESET ALL"));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 5d6f460..b7f4215 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.junit.Test;
 
@@ -168,11 +167,7 @@ public class BeamDDLTest {
     TestTableProvider rootProvider = new TestTableProvider();
     TestTableProvider testProvider = new TestTableProvider();
 
-    BeamSqlEnv env =
-        BeamSqlEnv.builder(rootProvider)
-            .addSchema("test", testProvider)
-            .setPipelineOptions(PipelineOptionsFactory.create())
-            .build();
+    BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build();
     assertNull(testProvider.getTables().get("person"));
     env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 638c20f..22430ba 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -332,7 +331,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
      */
     public void check(Pipeline pipeline) throws Exception {
       checkPTransform(pipeline);
-      checkJdbc(pipeline.getOptions());
+      checkJdbc();
     }
 
     private static final Schema DUMMY_SCHEMA = Schema.builder().addBooleanField("dummy").build();
@@ -354,7 +353,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
                         Schema.FieldType.STRING, "name")
                     .addRows(1, "first")));
 
-    private void checkJdbc(PipelineOptions pipelineOptions) throws Exception {
+    private void checkJdbc() throws Exception {
       // Beam SQL code is only invoked when the calling convention insists on it, so we
       // have to express this as selecting from a Beam table, even though the contents are
       // irrelevant.
@@ -364,7 +363,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
       //
       // Here we create a Beam table just to force the calling convention.
       TestTableProvider tableProvider = new TestTableProvider();
-      Connection connection = JdbcDriver.connect(tableProvider, pipelineOptions);
+      Connection connection = JdbcDriver.connect(tableProvider);
       connection
           .createStatement()
           .executeUpdate("CREATE EXTERNAL TABLE dummy (dummy BOOLEAN) TYPE 'test'");
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
deleted file mode 100644
index 23d09fc..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-
-import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
-import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import java.math.BigInteger;
-import java.util.stream.Stream;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Integration tests form writing to BigQuery with Beam SQL. */
-@RunWith(JUnit4.class)
-public class BigQueryRowCountIT {
-  private static final Schema SOURCE_SCHEMA =
-      Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build();
-  private static final String FAKE_JOB_NAME = "testPipelineOptionInjectionFakeJobName";
-
-  @Rule public transient TestPipeline pipeline = TestPipeline.create();
-  @Rule public transient TestPipeline readingPipeline = TestPipeline.create();
-  @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA);
-
-  @Test
-  public void testEmptyTable() {
-    BigQueryTableProvider provider = new BigQueryTableProvider();
-    Table table = getTable("testTable", bigQuery.tableSpec());
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-    BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
-    assertNotNull(size);
-    assertEquals(BigInteger.ZERO, size.getRowCount());
-  }
-
-  @Test
-  public void testNonEmptyTable() {
-    BigQueryTableProvider provider = new BigQueryTableProvider();
-    Table table = getTable("testTable", bigQuery.tableSpec());
-
-    pipeline
-        .apply(
-            Create.of(
-                    new TableRow().set("id", 1).set("name", "name1"),
-                    new TableRow().set("id", 2).set("name", "name2"),
-                    new TableRow().set("id", 3).set("name", "name3"))
-                .withCoder(TableRowJsonCoder.of()))
-        .apply(
-            BigQueryIO.writeTableRows()
-                .to(bigQuery.tableSpec())
-                .withSchema(
-                    new TableSchema()
-                        .setFields(
-                            ImmutableList.of(
-                                new TableFieldSchema().setName("id").setType("INTEGER"),
-                                new TableFieldSchema().setName("name").setType("STRING"))))
-                .withoutValidation());
-    pipeline.run().waitUntilFinish();
-
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-    BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
-
-    assertNotNull(size1);
-    assertEquals(BigInteger.valueOf(3), size1.getRowCount());
-  }
-
-  /** This tests if the pipeline options are injected in the path of SQL Transform. */
-  @Test
-  public void testPipelineOptionInjection() {
-    BigQueryTestTableProvider provider = new BigQueryTestTableProvider();
-    Table table = getTable("testTable", bigQuery.tableSpec());
-    provider.addTable("testTable", table);
-
-    pipeline
-        .apply(
-            Create.of(
-                    new TableRow().set("id", 1).set("name", "name1"),
-                    new TableRow().set("id", 2).set("name", "name2"),
-                    new TableRow().set("id", 3).set("name", "name3"))
-                .withCoder(TableRowJsonCoder.of()))
-        .apply(
-            BigQueryIO.writeTableRows()
-                .to(bigQuery.tableSpec())
-                .withSchema(
-                    new TableSchema()
-                        .setFields(
-                            ImmutableList.of(
-                                new TableFieldSchema().setName("id").setType("INTEGER"),
-                                new TableFieldSchema().setName("name").setType("STRING"))))
-                .withoutValidation());
-    pipeline.run().waitUntilFinish();
-
-    // changing pipeline options
-    readingPipeline.getOptions().setJobName(FAKE_JOB_NAME);
-
-    // Reading from the table should update the statistics of bigQuery table
-    readingPipeline.apply(
-        SqlTransform.query(" select * from testTable ")
-            .withDefaultTableProvider("bigquery", provider));
-
-    readingPipeline.run().waitUntilFinish();
-
-    BigQueryTestTable sqlTable = (BigQueryTestTable) provider.buildBeamSqlTable(table);
-    assertEquals(FAKE_JOB_NAME, sqlTable.getJobName());
-  }
-
-  @Test
-  public void testFakeTable() {
-    BigQueryTableProvider provider = new BigQueryTableProvider();
-    Table table = getTable("fakeTable", "project:dataset.table");
-
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-    BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
-    assertTrue(size.isUnknown());
-  }
-
-  private static Table getTable(String name, String location) {
-    return Table.builder()
-        .name(name)
-        .comment(name + " table")
-        .location(location)
-        .schema(
-            Stream.of(Schema.Field.nullable("id", INT64), Schema.Field.nullable("name", STRING))
-                .collect(toSchema()))
-        .type("bigquery")
-        .build();
-  }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
deleted file mode 100644
index db954ae..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * A BigQueryTable that keeps jobName from the pipeline options whenever row count is called. It is
- * made for {@link BigQueryRowCountIT#testPipelineOptionInjection()}
- */
-public class BigQueryTestTable extends BigQueryTable {
-  private String jobName = null;
-
-  BigQueryTestTable(Table table, BigQueryUtils.ConversionOptions options) {
-    super(table, options);
-  }
-
-  @Override
-  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
-    jobName = options.getJobName();
-    return super.getRowCount(options);
-  }
-
-  String getJobName() {
-    return this.jobName;
-  }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
deleted file mode 100644
index d8656ea..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-
-/** A test table provider for BigQueryRowCountIT. */
-public class BigQueryTestTableProvider extends BigQueryTableProvider {
-
-  private Map<String, Table> tableSpecMap;
-  private Map<String, BeamSqlTable> beamSqlTableMap;
-
-  BigQueryTestTableProvider() {
-    super();
-    tableSpecMap = new HashMap<>();
-    beamSqlTableMap = new HashMap<>();
-  }
-
-  void addTable(String name, Table table) {
-    tableSpecMap.put(name, table);
-  }
-
-  @Nullable
-  @Override
-  public Table getTable(String tableName) {
-    return tableSpecMap.get(tableName);
-  }
-
-  @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    BeamSqlTable t = beamSqlTableMap.get(table.getLocation());
-    if (t != null) {
-      return t;
-    }
-
-    t =
-        new BigQueryTestTable(
-            table,
-            BigQueryUtils.ConversionOptions.builder()
-                .setTruncateTimestamps(
-                    firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false)
-                        ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE
-                        : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT)
-                .build());
-    beamSqlTableMap.put(table.getLocation(), t);
-
-    return t;
-  }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index 1e89240..e8fa135 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -348,7 +348,7 @@ public class PubsubJsonIT implements Serializable {
       inMemoryMetaStore.registerProvider(tableProvider);
     }
 
-    JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options);
+    JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore);
     connection.setPipelineOptionsMap(argsMap);
     return connection;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 6c23708..82bb5ad 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -25,14 +25,12 @@ import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -543,27 +541,6 @@ public class BigQueryHelpers {
     }
   }
 
-  /**
-   * It returns the number of rows for a given table.
-   *
-   * @param options
-   * @param tableRef
-   * @return The number of rows in the table.
-   * @throws InterruptedException
-   * @throws IOException
-   */
-  @Nullable
-  public static BigInteger getNumRows(BigQueryOptions options, TableReference tableRef)
-      throws InterruptedException, IOException {
-
-    DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options);
-    Table table = datasetService.getTable(tableRef);
-    if (table == null) {
-      return null;
-    }
-    return table.getNumRows();
-  }
-
   static String getDatasetLocation(
       DatasetService datasetService, String projectId, String datasetId) {
     Dataset dataset;