You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/06/17 18:38:21 UTC

[beam] branch master updated: [BEAM-7513] Implements row estimation for BigQuery.

This is an automated email from the ASF dual-hosted git repository.

anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bc2c286  [BEAM-7513] Implements row estimation for BigQuery.
     new 2de7d07  Merge pull request #8822 from riazela/bigquery-rowcount
bc2c286 is described below

commit bc2c2863e400f28a4ac14ffb07b03cfe8a312a23
Author: Alireza Samadian <al...@gmail.com>
AuthorDate: Tue Jun 11 09:09:23 2019 -0700

    [BEAM-7513] Implements row estimation for BigQuery.
---
 .../apache/beam/sdk/extensions/sql/BeamSqlCli.java |   7 +-
 .../beam/sdk/extensions/sql/BeamSqlTable.java      |   7 +
 .../beam/sdk/extensions/sql/SqlTransform.java      |   2 +
 .../sql/impl/BeamCalciteSchemaFactory.java         |  12 +-
 .../sdk/extensions/sql/impl/BeamCalciteTable.java  |  23 +++
 .../BeamRowCountStatistics.java}                   |  37 +++--
 .../beam/sdk/extensions/sql/impl/BeamSqlEnv.java   |  26 +++-
 .../beam/sdk/extensions/sql/impl/JdbcDriver.java   |  34 ++++-
 .../sql/meta/provider/bigquery/BigQueryTable.java  |  39 +++++
 .../sdk/extensions/sql/impl/BeamSqlEnvTest.java    |   7 +-
 .../sdk/extensions/sql/impl/JdbcDriverTest.java    |  31 ++--
 .../extensions/sql/impl/parser/BeamDDLTest.java    |   7 +-
 ...BeamSqlBuiltinFunctionsIntegrationTestBase.java |   7 +-
 .../meta/provider/bigquery/BigQueryRowCountIT.java | 161 +++++++++++++++++++++
 .../meta/provider/bigquery/BigQueryTestTable.java  |  45 ++++++
 .../bigquery/BigQueryTestTableProvider.java        |  71 +++++++++
 .../sql/meta/provider/pubsub/PubsubJsonIT.java     |   2 +-
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  |  23 +++
 18 files changed, 496 insertions(+), 45 deletions(-)

diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 5e44c6c..8bdb1bf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
 import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 
 /** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */
 @Experimental
@@ -34,15 +35,17 @@ public class BeamSqlCli {
   private MetaStore metaStore;
 
   public BeamSqlCli metaStore(MetaStore metaStore) {
-    return metaStore(metaStore, false);
+    return metaStore(metaStore, false, PipelineOptionsFactory.create());
   }
 
-  public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
+  public BeamSqlCli metaStore(
+      MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) {
     this.metaStore = metaStore;
     BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
     if (autoLoadUdfUdaf) {
       builder.autoLoadUserDefinedFunctions();
     }
+    builder.setPipelineOptions(pipelineOptions);
     this.env = builder.build();
     return this;
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 14f1b80..63f7158 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql;
 
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -36,4 +38,9 @@ public interface BeamSqlTable {
 
   /** Get the schema info of the table. */
   Schema getSchema();
+
+  /** Estimates the number of rows or returns null if there is no estimation. */
+  default BeamRowCountStatistics getRowCount(PipelineOptions options) {
+    return BeamRowCountStatistics.UNKNOWN;
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index e45daca..afa4438 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -118,6 +118,8 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
     sqlEnvBuilder.setQueryPlannerClassName(
         input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName());
 
+    sqlEnvBuilder.setPipelineOptions(input.getPipeline().getOptions());
+
     BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
     return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString()));
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
index f6a016b..339810c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -51,8 +51,8 @@ import org.apache.calcite.schema.Table;
  * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection
  * properties).
  *
- * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid
- * loading all available table providers.
+ * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider,
+ * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers.
  */
 class BeamCalciteSchemaFactory {
 
@@ -97,10 +97,10 @@ class BeamCalciteSchemaFactory {
   }
 
   /**
-   * This is the override to create an empty schema, used in {@link
-   * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is
-   * expected to be replaced by an actual functional schema by the same code that specified this
-   * override in the first place.
+   * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider
+   * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This
+   * schema is expected to be replaced by an actual functional schema by the same code that
+   * specified this override in the first place.
    */
   public static class Empty extends InitialEmptySchema implements SchemaFactory {
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index e800c82..e53cb04 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -21,9 +21,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.apache.calcite.adapter.java.AbstractQueryableTable;
 import org.apache.calcite.linq4j.QueryProvider;
@@ -38,6 +40,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.schema.ModifiableTable;
 import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
 import org.apache.calcite.schema.TranslatableTable;
 
 /** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -62,6 +66,25 @@ public class BeamCalciteTable extends AbstractQueryableTable
   }
 
   @Override
+  public Statistic getStatistic() {
+    /*
+     Changing class loader is required for the JDBC path. It is similar to what done in
+     {@link BeamEnumerableConverter#toRowList} and {@link BeamEnumerableConverter#toEnumerable }.
+    */
+    final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+    try {
+      Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
+      BeamRowCountStatistics beamStatistics =
+          beamTable.getRowCount(BeamEnumerableConverter.createPipelineOptions(pipelineOptions));
+      return beamStatistics.isUnknown()
+          ? Statistics.UNKNOWN
+          : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of());
+    } finally {
+      Thread.currentThread().setContextClassLoader(originalClassLoader);
+    }
+  }
+
+  @Override
   public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
     return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptions);
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
similarity index 51%
copy from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
copy to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
index 14f1b80..ac0431d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
@@ -15,25 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql;
+package org.apache.beam.sdk.extensions.sql.impl;
 
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
+import java.io.Serializable;
+import java.math.BigInteger;
 
-/** This interface defines a Beam Sql Table. */
-public interface BeamSqlTable {
-  /** create a {@code PCollection<Row>} from source. */
-  PCollection<Row> buildIOReader(PBegin begin);
+/** This class stores row count statistics. */
+public class BeamRowCountStatistics implements Serializable {
+  public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null);
+  private final BigInteger rowCount;
 
-  /** create a {@code IO.write()} instance to write to target. */
-  POutput buildIOWriter(PCollection<Row> input);
+  private BeamRowCountStatistics(BigInteger rowCount) {
+    this.rowCount = rowCount;
+  }
 
-  /** Whether this table is bounded (known to be finite) or unbounded (may or may not be finite). */
-  PCollection.IsBounded isBounded();
+  public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) {
+    return new BeamRowCountStatistics(rowCount);
+  }
 
-  /** Get the schema info of the table. */
-  Schema getSchema();
+  /** Is true if the row count cannot be estimated. */
+  public boolean isUnknown() {
+    return rowCount == null;
+  }
+
+  public BigInteger getRowCount() {
+    return rowCount;
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 02b3e69..ae4238d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
 import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
@@ -66,14 +68,26 @@ public class BeamSqlEnv {
     return new BeamSqlEnvBuilder(tableProvider);
   }
 
+  /**
+   * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
+   * Pipeline Options. It should only be used in tests.
+   */
   public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
     return withTableProvider(new ReadOnlyTableProvider(tableType, tables));
   }
 
+  /**
+   * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
+   * Pipeline Options. It should only be used in tests.
+   */
   public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
-    return builder(tableProvider).build();
+    return builder(tableProvider).setPipelineOptions(PipelineOptionsFactory.create()).build();
   }
 
+  /**
+   * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty *
+   * Pipeline Options. It should only be used in tests.
+   */
   public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
     InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
     for (TableProvider tableProvider : tableProviders) {
@@ -123,6 +137,7 @@ public class BeamSqlEnv {
     private Set<Map.Entry<String, Function>> functionSet;
     private boolean autoLoadBuiltinFunctions;
     private boolean autoLoadUdfs;
+    private PipelineOptions pipelineOptions;
 
     private BeamSqlEnvBuilder(TableProvider tableProvider) {
       checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
@@ -133,6 +148,7 @@ public class BeamSqlEnv {
       functionSet = new HashSet<>();
       autoLoadUdfs = false;
       autoLoadBuiltinFunctions = false;
+      pipelineOptions = null;
     }
 
     /** Add a top-level schema backed by the table provider. */
@@ -194,14 +210,20 @@ public class BeamSqlEnv {
       return this;
     }
 
+    public BeamSqlEnvBuilder setPipelineOptions(PipelineOptions pipelineOptions) {
+      this.pipelineOptions = pipelineOptions;
+      return this;
+    }
+
     /**
      * Build function to create an instance of BeamSqlEnv based on preset fields.
      *
      * @return BeamSqlEnv.
      */
     public BeamSqlEnv build() {
+      checkNotNull(pipelineOptions);
 
-      JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);
+      JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider, pipelineOptions);
 
       configureSchemas(jdbcConnection);
 
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index bb7cc42..4dfabb8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -20,11 +20,14 @@ package org.apache.beam.sdk.extensions.sql.impl;
 import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
 import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.service.AutoService;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.function.Consumer;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -100,6 +103,8 @@ public class JdbcDriver extends Driver {
     INSTANCE.register();
   }
 
+  public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
   @Override
   protected AvaticaFactory createFactory() {
     return JdbcFactory.wrap((CalciteFactory) super.createFactory());
@@ -141,7 +146,7 @@ public class JdbcDriver extends Driver {
    * not this path. The CLI ends up using the schema factory that populates the default schema with
    * all table providers it can find. See {@link BeamCalciteSchemaFactory}.
    */
-  public static JdbcConnection connect(TableProvider tableProvider) {
+  public static JdbcConnection connect(TableProvider tableProvider, PipelineOptions options) {
     try {
       Properties properties = new Properties();
       properties.setProperty(
@@ -149,9 +154,36 @@ public class JdbcDriver extends Driver {
       JdbcConnection connection =
           (JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
       connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
+      connection.setPipelineOptionsMap(getOptionsMap(options));
       return connection;
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
+
+  /** Converts {@link PipelineOptions} to its map format. */
+  private static Map<String, String> getOptionsMap(PipelineOptions options) {
+    Map map = OBJECT_MAPPER.convertValue(options, Map.class);
+
+    map = (Map) map.get("options");
+    if (map == null) {
+      map = new HashMap();
+    }
+
+    Map<String, String> optionMap = new HashMap<>();
+    for (Object entry : map.entrySet()) {
+      Map.Entry ent = (Map.Entry) entry;
+      String value;
+      try {
+        value =
+            (ent.getValue() instanceof String)
+                ? ent.getValue().toString()
+                : OBJECT_MAPPER.writeValueAsString(ent.getValue());
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Unable to parse Pipeline Options", e);
+      }
+      optionMap.put(ent.getKey().toString(), value);
+    }
+    return optionMap;
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 621f149..222e4cf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -17,19 +17,27 @@
  */
 package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
 
+import java.io.IOException;
 import java.io.Serializable;
+import java.math.BigInteger;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.SchemaCoder;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently
@@ -39,6 +47,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
 class BigQueryTable extends BaseBeamTable implements Serializable {
   @VisibleForTesting final String bqLocation;
   private final ConversionOptions conversionOptions;
+  private BeamRowCountStatistics rowCountStatistics = null;
+  private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class);
 
   BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
     super(table.getSchema());
@@ -47,6 +57,16 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
   }
 
   @Override
+  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+
+    if (rowCountStatistics == null) {
+      rowCountStatistics = getRowCountFromBQ(options, bqLocation);
+    }
+
+    return rowCountStatistics;
+  }
+
+  @Override
   public PCollection.IsBounded isBounded() {
     return PCollection.IsBounded.BOUNDED;
   }
@@ -72,4 +92,23 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
             .withFormatFunction(BigQueryUtils.toTableRow())
             .to(bqLocation));
   }
+
+  private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) {
+    try {
+      BigInteger rowCount =
+          BigQueryHelpers.getNumRows(
+              o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation));
+
+      if (rowCount == null) {
+        return BeamRowCountStatistics.UNKNOWN;
+      }
+
+      return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
+
+    } catch (IOException | InterruptedException e) {
+      LOGGER.warn("Could not get the row count for the table " + bqLocation, e);
+    }
+
+    return BeamRowCountStatistics.UNKNOWN;
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index 517309d..3b6dda0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -24,6 +24,7 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -42,6 +43,7 @@ public class BeamSqlEnvTest {
         BeamSqlEnv.builder(root)
             .addSchema("nested", nested)
             .addSchema("anotherOne", anotherOne)
+            .setPipelineOptions(PipelineOptionsFactory.create())
             .build();
 
     Connection connection = env.connection;
@@ -60,6 +62,9 @@ public class BeamSqlEnvTest {
     exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
 
     TestTableProvider root = new TestTableProvider();
-    BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build();
+    BeamSqlEnv.builder(root)
+        .setQueryPlannerClassName("org.test.ClassNotFound")
+        .setPipelineOptions(PipelineOptionsFactory.create())
+        .build();
   }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 6f36173..3272d00 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.util.ReleaseInfo;
 import org.apache.beam.sdk.values.Row;
@@ -197,7 +198,7 @@ public class JdbcDriverTest {
   @Test
   public void testSelectsFromExistingTable() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider);
+    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
 
     connection
         .createStatement()
@@ -219,7 +220,7 @@ public class JdbcDriverTest {
   @Test
   public void testTimestampWithDefaultTimezone() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider);
+    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
 
     // A table with one TIMESTAMP column
     Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -250,7 +251,7 @@ public class JdbcDriverTest {
   public void testTimestampWithNonzeroTimezone() throws Exception {
     Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT);
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider);
+    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
 
     // A table with one TIMESTAMP column
     Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -280,7 +281,7 @@ public class JdbcDriverTest {
   public void testTimestampWithZeroTimezone() throws Exception {
     Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider);
+    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
 
     // A table with one TIMESTAMP column
     Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -309,7 +310,7 @@ public class JdbcDriverTest {
   @Test
   public void testSelectsFromExistingComplexTable() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider);
+    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
 
     connection
         .createStatement()
@@ -343,7 +344,7 @@ public class JdbcDriverTest {
   @Test
   public void testInsertIntoCreatedTable() throws Exception {
     TestTableProvider tableProvider = new TestTableProvider();
-    Connection connection = JdbcDriver.connect(tableProvider);
+    Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
 
     connection
         .createStatement()
@@ -369,7 +370,8 @@ public class JdbcDriverTest {
 
   @Test
   public void testInternalConnect_boundedTable() throws Exception {
-    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+    CalciteConnection connection =
+        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
     Statement statement = connection.createStatement();
     ResultSet resultSet = statement.executeQuery("SELECT * FROM test");
     assertTrue(resultSet.next());
@@ -392,7 +394,8 @@ public class JdbcDriverTest {
                     .addRows(1, "second first")
                     .addRows(2, "second")));
 
-    CalciteConnection connection = JdbcDriver.connect(tableProvider);
+    CalciteConnection connection =
+        JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
     Statement statement = connection.createStatement();
     ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 5");
     assertTrue(resultSet1.next());
@@ -432,7 +435,8 @@ public class JdbcDriverTest {
                     .timestampColumnIndex(3)
                     .addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)));
 
-    CalciteConnection connection = JdbcDriver.connect(tableProvider);
+    CalciteConnection connection =
+        JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
     Statement statement = connection.createStatement();
 
     ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 1");
@@ -470,7 +474,8 @@ public class JdbcDriverTest {
 
   @Test
   public void testInternalConnect_setDirectRunner() throws Exception {
-    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+    CalciteConnection connection =
+        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
     Statement statement = connection.createStatement();
     assertEquals(0, statement.executeUpdate("SET runner = direct"));
     assertTrue(statement.execute("SELECT * FROM test"));
@@ -480,7 +485,8 @@ public class JdbcDriverTest {
   public void testInternalConnect_setBogusRunner() throws Exception {
     thrown.expectMessage("Unknown 'runner' specified 'bogus'");
 
-    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+    CalciteConnection connection =
+        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
     Statement statement = connection.createStatement();
     assertEquals(0, statement.executeUpdate("SET runner = bogus"));
     assertTrue(statement.execute("SELECT * FROM test"));
@@ -488,7 +494,8 @@ public class JdbcDriverTest {
 
   @Test
   public void testInternalConnect_resetAll() throws Exception {
-    CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+    CalciteConnection connection =
+        JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
     Statement statement = connection.createStatement();
     assertEquals(0, statement.executeUpdate("SET runner = bogus"));
     assertEquals(0, statement.executeUpdate("RESET ALL"));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index b7f4215..5d6f460 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
 import org.junit.Test;
 
@@ -167,7 +168,11 @@ public class BeamDDLTest {
     TestTableProvider rootProvider = new TestTableProvider();
     TestTableProvider testProvider = new TestTableProvider();
 
-    BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build();
+    BeamSqlEnv env =
+        BeamSqlEnv.builder(rootProvider)
+            .addSchema("test", testProvider)
+            .setPipelineOptions(PipelineOptionsFactory.create())
+            .build();
     assertNull(testProvider.getTables().get("person"));
     env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");
 
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 22430ba..638c20f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -331,7 +332,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
      */
     public void check(Pipeline pipeline) throws Exception {
       checkPTransform(pipeline);
-      checkJdbc();
+      checkJdbc(pipeline.getOptions());
     }
 
     private static final Schema DUMMY_SCHEMA = Schema.builder().addBooleanField("dummy").build();
@@ -353,7 +354,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
                         Schema.FieldType.STRING, "name")
                     .addRows(1, "first")));
 
-    private void checkJdbc() throws Exception {
+    private void checkJdbc(PipelineOptions pipelineOptions) throws Exception {
       // Beam SQL code is only invoked when the calling convention insists on it, so we
       // have to express this as selecting from a Beam table, even though the contents are
       // irrelevant.
@@ -363,7 +364,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
       //
       // Here we create a Beam table just to force the calling convention.
       TestTableProvider tableProvider = new TestTableProvider();
-      Connection connection = JdbcDriver.connect(tableProvider);
+      Connection connection = JdbcDriver.connect(tableProvider, pipelineOptions);
       connection
           .createStatement()
           .executeUpdate("CREATE EXTERNAL TABLE dummy (dummy BOOLEAN) TYPE 'test'");
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
new file mode 100644
index 0000000..23d09fc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.math.BigInteger;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests form writing to BigQuery with Beam SQL. */
+@RunWith(JUnit4.class)
+public class BigQueryRowCountIT {
+  private static final Schema SOURCE_SCHEMA =
+      Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build();
+  private static final String FAKE_JOB_NAME = "testPipelineOptionInjectionFakeJobName";
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public transient TestPipeline readingPipeline = TestPipeline.create();
+  @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA);
+
+  @Test
+  public void testEmptyTable() {
+    BigQueryTableProvider provider = new BigQueryTableProvider();
+    Table table = getTable("testTable", bigQuery.tableSpec());
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+    BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+    assertNotNull(size);
+    assertEquals(BigInteger.ZERO, size.getRowCount());
+  }
+
+  @Test
+  public void testNonEmptyTable() {
+    BigQueryTableProvider provider = new BigQueryTableProvider();
+    Table table = getTable("testTable", bigQuery.tableSpec());
+
+    pipeline
+        .apply(
+            Create.of(
+                    new TableRow().set("id", 1).set("name", "name1"),
+                    new TableRow().set("id", 2).set("name", "name2"),
+                    new TableRow().set("id", 3).set("name", "name3"))
+                .withCoder(TableRowJsonCoder.of()))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to(bigQuery.tableSpec())
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new TableFieldSchema().setName("id").setType("INTEGER"),
+                                new TableFieldSchema().setName("name").setType("STRING"))))
+                .withoutValidation());
+    pipeline.run().waitUntilFinish();
+
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+    BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+
+    assertNotNull(size1);
+    assertEquals(BigInteger.valueOf(3), size1.getRowCount());
+  }
+
+  /** This tests if the pipeline options are injected in the path of SQL Transform. */
+  @Test
+  public void testPipelineOptionInjection() {
+    BigQueryTestTableProvider provider = new BigQueryTestTableProvider();
+    Table table = getTable("testTable", bigQuery.tableSpec());
+    provider.addTable("testTable", table);
+
+    pipeline
+        .apply(
+            Create.of(
+                    new TableRow().set("id", 1).set("name", "name1"),
+                    new TableRow().set("id", 2).set("name", "name2"),
+                    new TableRow().set("id", 3).set("name", "name3"))
+                .withCoder(TableRowJsonCoder.of()))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to(bigQuery.tableSpec())
+                .withSchema(
+                    new TableSchema()
+                        .setFields(
+                            ImmutableList.of(
+                                new TableFieldSchema().setName("id").setType("INTEGER"),
+                                new TableFieldSchema().setName("name").setType("STRING"))))
+                .withoutValidation());
+    pipeline.run().waitUntilFinish();
+
+    // changing pipeline options
+    readingPipeline.getOptions().setJobName(FAKE_JOB_NAME);
+
+    // Reading from the table should update the statistics of bigQuery table
+    readingPipeline.apply(
+        SqlTransform.query(" select * from testTable ")
+            .withDefaultTableProvider("bigquery", provider));
+
+    readingPipeline.run().waitUntilFinish();
+
+    BigQueryTestTable sqlTable = (BigQueryTestTable) provider.buildBeamSqlTable(table);
+    assertEquals(FAKE_JOB_NAME, sqlTable.getJobName());
+  }
+
+  @Test
+  public void testFakeTable() {
+    BigQueryTableProvider provider = new BigQueryTableProvider();
+    Table table = getTable("fakeTable", "project:dataset.table");
+
+    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+    BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+    assertTrue(size.isUnknown());
+  }
+
+  private static Table getTable(String name, String location) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .location(location)
+        .schema(
+            Stream.of(Schema.Field.nullable("id", INT64), Schema.Field.nullable("name", STRING))
+                .collect(toSchema()))
+        .type("bigquery")
+        .build();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
new file mode 100644
index 0000000..db954ae
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A BigQueryTable that keeps jobName from the pipeline options whenever row count is called. It is
+ * made for {@link BigQueryRowCountIT#testPipelineOptionInjection()}
+ */
+public class BigQueryTestTable extends BigQueryTable {
+  private String jobName = null;
+
+  BigQueryTestTable(Table table, BigQueryUtils.ConversionOptions options) {
+    super(table, options);
+  }
+
+  @Override
+  public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+    jobName = options.getJobName();
+    return super.getRowCount(options);
+  }
+
+  String getJobName() {
+    return this.jobName;
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
new file mode 100644
index 0000000..d8656ea
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+
+/** A test table provider for BigQueryRowCountIT. */
+public class BigQueryTestTableProvider extends BigQueryTableProvider {
+
+  private Map<String, Table> tableSpecMap;
+  private Map<String, BeamSqlTable> beamSqlTableMap;
+
+  BigQueryTestTableProvider() {
+    super();
+    tableSpecMap = new HashMap<>();
+    beamSqlTableMap = new HashMap<>();
+  }
+
+  void addTable(String name, Table table) {
+    tableSpecMap.put(name, table);
+  }
+
+  @Nullable
+  @Override
+  public Table getTable(String tableName) {
+    return tableSpecMap.get(tableName);
+  }
+
+  @Override
+  public BeamSqlTable buildBeamSqlTable(Table table) {
+    BeamSqlTable t = beamSqlTableMap.get(table.getLocation());
+    if (t != null) {
+      return t;
+    }
+
+    t =
+        new BigQueryTestTable(
+            table,
+            BigQueryUtils.ConversionOptions.builder()
+                .setTruncateTimestamps(
+                    firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false)
+                        ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE
+                        : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT)
+                .build());
+    beamSqlTableMap.put(table.getLocation(), t);
+
+    return t;
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index e8fa135..1e89240 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -348,7 +348,7 @@ public class PubsubJsonIT implements Serializable {
       inMemoryMetaStore.registerProvider(tableProvider);
     }
 
-    JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore);
+    JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options);
     connection.setPipelineOptionsMap(argsMap);
     return connection;
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 82bb5ad..6c23708 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -25,12 +25,14 @@ import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
 import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import java.io.IOException;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -541,6 +543,27 @@ public class BigQueryHelpers {
     }
   }
 
+  /**
+   * It returns the number of rows for a given table.
+   *
+   * @param options
+   * @param tableRef
+   * @return The number of rows in the table.
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  @Nullable
+  public static BigInteger getNumRows(BigQueryOptions options, TableReference tableRef)
+      throws InterruptedException, IOException {
+
+    DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options);
+    Table table = datasetService.getTable(tableRef);
+    if (table == null) {
+      return null;
+    }
+    return table.getNumRows();
+  }
+
   static String getDatasetLocation(
       DatasetService datasetService, String projectId, String datasetId) {
     Dataset dataset;