You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/06/17 18:38:21 UTC
[beam] branch master updated: [BEAM-7513] Implements row estimation
for BigQuery.
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bc2c286 [BEAM-7513] Implements row estimation for BigQuery.
new 2de7d07 Merge pull request #8822 from riazela/bigquery-rowcount
bc2c286 is described below
commit bc2c2863e400f28a4ac14ffb07b03cfe8a312a23
Author: Alireza Samadian <al...@gmail.com>
AuthorDate: Tue Jun 11 09:09:23 2019 -0700
[BEAM-7513] Implements row estimation for BigQuery.
---
.../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 7 +-
.../beam/sdk/extensions/sql/BeamSqlTable.java | 7 +
.../beam/sdk/extensions/sql/SqlTransform.java | 2 +
.../sql/impl/BeamCalciteSchemaFactory.java | 12 +-
.../sdk/extensions/sql/impl/BeamCalciteTable.java | 23 +++
.../BeamRowCountStatistics.java} | 37 +++--
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 26 +++-
.../beam/sdk/extensions/sql/impl/JdbcDriver.java | 34 ++++-
.../sql/meta/provider/bigquery/BigQueryTable.java | 39 +++++
.../sdk/extensions/sql/impl/BeamSqlEnvTest.java | 7 +-
.../sdk/extensions/sql/impl/JdbcDriverTest.java | 31 ++--
.../extensions/sql/impl/parser/BeamDDLTest.java | 7 +-
...BeamSqlBuiltinFunctionsIntegrationTestBase.java | 7 +-
.../meta/provider/bigquery/BigQueryRowCountIT.java | 161 +++++++++++++++++++++
.../meta/provider/bigquery/BigQueryTestTable.java | 45 ++++++
.../bigquery/BigQueryTestTableProvider.java | 71 +++++++++
.../sql/meta/provider/pubsub/PubsubJsonIT.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 23 +++
18 files changed, 496 insertions(+), 45 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 5e44c6c..8bdb1bf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
/** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */
@Experimental
@@ -34,15 +35,17 @@ public class BeamSqlCli {
private MetaStore metaStore;
public BeamSqlCli metaStore(MetaStore metaStore) {
- return metaStore(metaStore, false);
+ return metaStore(metaStore, false, PipelineOptionsFactory.create());
}
- public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
+ public BeamSqlCli metaStore(
+ MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) {
this.metaStore = metaStore;
BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
if (autoLoadUdfUdaf) {
builder.autoLoadUserDefinedFunctions();
}
+ builder.setPipelineOptions(pipelineOptions);
this.env = builder.build();
return this;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 14f1b80..63f7158 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.extensions.sql;
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -36,4 +38,9 @@ public interface BeamSqlTable {
/** Get the schema info of the table. */
Schema getSchema();
+
+ /** Estimates the number of rows or returns null if there is no estimation. */
+ default BeamRowCountStatistics getRowCount(PipelineOptions options) {
+ return BeamRowCountStatistics.UNKNOWN;
+ }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index e45daca..afa4438 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -118,6 +118,8 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
sqlEnvBuilder.setQueryPlannerClassName(
input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName());
+ sqlEnvBuilder.setPipelineOptions(input.getPipeline().getOptions());
+
BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString()));
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
index f6a016b..339810c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -51,8 +51,8 @@ import org.apache.calcite.schema.Table;
* a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection
* properties).
*
- * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid
- * loading all available table providers.
+ * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider,
+ * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers.
*/
class BeamCalciteSchemaFactory {
@@ -97,10 +97,10 @@ class BeamCalciteSchemaFactory {
}
/**
- * This is the override to create an empty schema, used in {@link
- * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is
- * expected to be replaced by an actual functional schema by the same code that specified this
- * override in the first place.
+ * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider
+ * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This
+ * schema is expected to be replaced by an actual functional schema by the same code that
+ * specified this override in the first place.
*/
public static class Empty extends InitialEmptySchema implements SchemaFactory {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index e800c82..e53cb04 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -21,9 +21,11 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.QueryProvider;
@@ -38,6 +40,8 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Statistic;
+import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.TranslatableTable;
/** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -62,6 +66,25 @@ public class BeamCalciteTable extends AbstractQueryableTable
}
@Override
+ public Statistic getStatistic() {
+ /*
+ Changing class loader is required for the JDBC path. It is similar to what done in
+ {@link BeamEnumerableConverter#toRowList} and {@link BeamEnumerableConverter#toEnumerable }.
+ */
+ final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
+ BeamRowCountStatistics beamStatistics =
+ beamTable.getRowCount(BeamEnumerableConverter.createPipelineOptions(pipelineOptions));
+ return beamStatistics.isUnknown()
+ ? Statistics.UNKNOWN
+ : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of());
+ } finally {
+ Thread.currentThread().setContextClassLoader(originalClassLoader);
+ }
+ }
+
+ @Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptions);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
similarity index 51%
copy from sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
copy to sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
index 14f1b80..ac0431d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
@@ -15,25 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.sql;
+package org.apache.beam.sdk.extensions.sql.impl;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
+import java.io.Serializable;
+import java.math.BigInteger;
-/** This interface defines a Beam Sql Table. */
-public interface BeamSqlTable {
- /** create a {@code PCollection<Row>} from source. */
- PCollection<Row> buildIOReader(PBegin begin);
+/** This class stores row count statistics. */
+public class BeamRowCountStatistics implements Serializable {
+ public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null);
+ private final BigInteger rowCount;
- /** create a {@code IO.write()} instance to write to target. */
- POutput buildIOWriter(PCollection<Row> input);
+ private BeamRowCountStatistics(BigInteger rowCount) {
+ this.rowCount = rowCount;
+ }
- /** Whether this table is bounded (known to be finite) or unbounded (may or may not be finite). */
- PCollection.IsBounded isBounded();
+ public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) {
+ return new BeamRowCountStatistics(rowCount);
+ }
- /** Get the schema info of the table. */
- Schema getSchema();
+ /** Is true if the row count cannot be estimated. */
+ public boolean isUnknown() {
+ return rowCount == null;
+ }
+
+ public BigInteger getRowCount() {
+ return rowCount;
+ }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index 02b3e69..ae4238d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
@@ -66,14 +68,26 @@ public class BeamSqlEnv {
return new BeamSqlEnvBuilder(tableProvider);
}
+ /**
+ * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
+ * Pipeline Options. It should only be used in tests.
+ */
public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
return withTableProvider(new ReadOnlyTableProvider(tableType, tables));
}
+ /**
+ * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
+ * Pipeline Options. It should only be used in tests.
+ */
public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
- return builder(tableProvider).build();
+ return builder(tableProvider).setPipelineOptions(PipelineOptionsFactory.create()).build();
}
+ /**
+ * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty *
+ * Pipeline Options. It should only be used in tests.
+ */
public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
for (TableProvider tableProvider : tableProviders) {
@@ -123,6 +137,7 @@ public class BeamSqlEnv {
private Set<Map.Entry<String, Function>> functionSet;
private boolean autoLoadBuiltinFunctions;
private boolean autoLoadUdfs;
+ private PipelineOptions pipelineOptions;
private BeamSqlEnvBuilder(TableProvider tableProvider) {
checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
@@ -133,6 +148,7 @@ public class BeamSqlEnv {
functionSet = new HashSet<>();
autoLoadUdfs = false;
autoLoadBuiltinFunctions = false;
+ pipelineOptions = null;
}
/** Add a top-level schema backed by the table provider. */
@@ -194,14 +210,20 @@ public class BeamSqlEnv {
return this;
}
+ public BeamSqlEnvBuilder setPipelineOptions(PipelineOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ return this;
+ }
+
/**
* Build function to create an instance of BeamSqlEnv based on preset fields.
*
* @return BeamSqlEnv.
*/
public BeamSqlEnv build() {
+ checkNotNull(pipelineOptions);
- JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);
+ JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider, pipelineOptions);
configureSchemas(jdbcConnection);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index bb7cc42..4dfabb8 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -20,11 +20,14 @@ package org.apache.beam.sdk.extensions.sql.impl;
import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -100,6 +103,8 @@ public class JdbcDriver extends Driver {
INSTANCE.register();
}
+ public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
@Override
protected AvaticaFactory createFactory() {
return JdbcFactory.wrap((CalciteFactory) super.createFactory());
@@ -141,7 +146,7 @@ public class JdbcDriver extends Driver {
* not this path. The CLI ends up using the schema factory that populates the default schema with
* all table providers it can find. See {@link BeamCalciteSchemaFactory}.
*/
- public static JdbcConnection connect(TableProvider tableProvider) {
+ public static JdbcConnection connect(TableProvider tableProvider, PipelineOptions options) {
try {
Properties properties = new Properties();
properties.setProperty(
@@ -149,9 +154,36 @@ public class JdbcDriver extends Driver {
JdbcConnection connection =
(JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
+ connection.setPipelineOptionsMap(getOptionsMap(options));
return connection;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
+
+ /** Converts {@link PipelineOptions} to its map format. */
+ private static Map<String, String> getOptionsMap(PipelineOptions options) {
+ Map map = OBJECT_MAPPER.convertValue(options, Map.class);
+
+ map = (Map) map.get("options");
+ if (map == null) {
+ map = new HashMap();
+ }
+
+ Map<String, String> optionMap = new HashMap<>();
+ for (Object entry : map.entrySet()) {
+ Map.Entry ent = (Map.Entry) entry;
+ String value;
+ try {
+ value =
+ (ent.getValue() instanceof String)
+ ? ent.getValue().toString()
+ : OBJECT_MAPPER.writeValueAsString(ent.getValue());
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Unable to parse Pipeline Options", e);
+ }
+ optionMap.put(ent.getKey().toString(), value);
+ }
+ return optionMap;
+ }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 621f149..222e4cf 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -17,19 +17,27 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+import java.io.IOException;
import java.io.Serializable;
+import java.math.BigInteger;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently
@@ -39,6 +47,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleF
class BigQueryTable extends BaseBeamTable implements Serializable {
@VisibleForTesting final String bqLocation;
private final ConversionOptions conversionOptions;
+ private BeamRowCountStatistics rowCountStatistics = null;
+ private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class);
BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
super(table.getSchema());
@@ -47,6 +57,16 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
}
@Override
+ public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+
+ if (rowCountStatistics == null) {
+ rowCountStatistics = getRowCountFromBQ(options, bqLocation);
+ }
+
+ return rowCountStatistics;
+ }
+
+ @Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}
@@ -72,4 +92,23 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
.withFormatFunction(BigQueryUtils.toTableRow())
.to(bqLocation));
}
+
+ private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) {
+ try {
+ BigInteger rowCount =
+ BigQueryHelpers.getNumRows(
+ o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation));
+
+ if (rowCount == null) {
+ return BeamRowCountStatistics.UNKNOWN;
+ }
+
+ return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
+
+ } catch (IOException | InterruptedException e) {
+ LOGGER.warn("Could not get the row count for the table " + bqLocation, e);
+ }
+
+ return BeamRowCountStatistics.UNKNOWN;
+ }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index 517309d..3b6dda0 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -24,6 +24,7 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import java.sql.Connection;
import java.sql.ResultSet;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -42,6 +43,7 @@ public class BeamSqlEnvTest {
BeamSqlEnv.builder(root)
.addSchema("nested", nested)
.addSchema("anotherOne", anotherOne)
+ .setPipelineOptions(PipelineOptionsFactory.create())
.build();
Connection connection = env.connection;
@@ -60,6 +62,9 @@ public class BeamSqlEnvTest {
exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
TestTableProvider root = new TestTableProvider();
- BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build();
+ BeamSqlEnv.builder(root)
+ .setQueryPlannerClassName("org.test.ClassNotFound")
+ .setPipelineOptions(PipelineOptionsFactory.create())
+ .build();
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 6f36173..3272d00 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -46,6 +46,7 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.Row;
@@ -197,7 +198,7 @@ public class JdbcDriverTest {
@Test
public void testSelectsFromExistingTable() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
connection
.createStatement()
@@ -219,7 +220,7 @@ public class JdbcDriverTest {
@Test
public void testTimestampWithDefaultTimezone() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -250,7 +251,7 @@ public class JdbcDriverTest {
public void testTimestampWithNonzeroTimezone() throws Exception {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT);
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -280,7 +281,7 @@ public class JdbcDriverTest {
public void testTimestampWithZeroTimezone() throws Exception {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -309,7 +310,7 @@ public class JdbcDriverTest {
@Test
public void testSelectsFromExistingComplexTable() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
connection
.createStatement()
@@ -343,7 +344,7 @@ public class JdbcDriverTest {
@Test
public void testInsertIntoCreatedTable() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
connection
.createStatement()
@@ -369,7 +370,8 @@ public class JdbcDriverTest {
@Test
public void testInternalConnect_boundedTable() throws Exception {
- CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ CalciteConnection connection =
+ JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM test");
assertTrue(resultSet.next());
@@ -392,7 +394,8 @@ public class JdbcDriverTest {
.addRows(1, "second first")
.addRows(2, "second")));
- CalciteConnection connection = JdbcDriver.connect(tableProvider);
+ CalciteConnection connection =
+ JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
Statement statement = connection.createStatement();
ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 5");
assertTrue(resultSet1.next());
@@ -432,7 +435,8 @@ public class JdbcDriverTest {
.timestampColumnIndex(3)
.addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)));
- CalciteConnection connection = JdbcDriver.connect(tableProvider);
+ CalciteConnection connection =
+ JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
Statement statement = connection.createStatement();
ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 1");
@@ -470,7 +474,8 @@ public class JdbcDriverTest {
@Test
public void testInternalConnect_setDirectRunner() throws Exception {
- CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ CalciteConnection connection =
+ JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
Statement statement = connection.createStatement();
assertEquals(0, statement.executeUpdate("SET runner = direct"));
assertTrue(statement.execute("SELECT * FROM test"));
@@ -480,7 +485,8 @@ public class JdbcDriverTest {
public void testInternalConnect_setBogusRunner() throws Exception {
thrown.expectMessage("Unknown 'runner' specified 'bogus'");
- CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ CalciteConnection connection =
+ JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
Statement statement = connection.createStatement();
assertEquals(0, statement.executeUpdate("SET runner = bogus"));
assertTrue(statement.execute("SELECT * FROM test"));
@@ -488,7 +494,8 @@ public class JdbcDriverTest {
@Test
public void testInternalConnect_resetAll() throws Exception {
- CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
+ CalciteConnection connection =
+ JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
Statement statement = connection.createStatement();
assertEquals(0, statement.executeUpdate("SET runner = bogus"));
assertEquals(0, statement.executeUpdate("RESET ALL"));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index b7f4215..5d6f460 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.junit.Test;
@@ -167,7 +168,11 @@ public class BeamDDLTest {
TestTableProvider rootProvider = new TestTableProvider();
TestTableProvider testProvider = new TestTableProvider();
- BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build();
+ BeamSqlEnv env =
+ BeamSqlEnv.builder(rootProvider)
+ .addSchema("test", testProvider)
+ .setPipelineOptions(PipelineOptionsFactory.create())
+ .build();
assertNull(testProvider.getTables().get("person"));
env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 22430ba..638c20f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -37,6 +37,7 @@ import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -331,7 +332,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
*/
public void check(Pipeline pipeline) throws Exception {
checkPTransform(pipeline);
- checkJdbc();
+ checkJdbc(pipeline.getOptions());
}
private static final Schema DUMMY_SCHEMA = Schema.builder().addBooleanField("dummy").build();
@@ -353,7 +354,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
Schema.FieldType.STRING, "name")
.addRows(1, "first")));
- private void checkJdbc() throws Exception {
+ private void checkJdbc(PipelineOptions pipelineOptions) throws Exception {
// Beam SQL code is only invoked when the calling convention insists on it, so we
// have to express this as selecting from a Beam table, even though the contents are
// irrelevant.
@@ -363,7 +364,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
//
// Here we create a Beam table just to force the calling convention.
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider);
+ Connection connection = JdbcDriver.connect(tableProvider, pipelineOptions);
connection
.createStatement()
.executeUpdate("CREATE EXTERNAL TABLE dummy (dummy BOOLEAN) TYPE 'test'");
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
new file mode 100644
index 0000000..23d09fc
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
+import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
+import static org.apache.beam.sdk.schemas.Schema.toSchema;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.math.BigInteger;
+import java.util.stream.Stream;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests form writing to BigQuery with Beam SQL. */
+@RunWith(JUnit4.class)
+public class BigQueryRowCountIT {
+ private static final Schema SOURCE_SCHEMA =
+ Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build();
+ private static final String FAKE_JOB_NAME = "testPipelineOptionInjectionFakeJobName";
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public transient TestPipeline readingPipeline = TestPipeline.create();
+ @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA);
+
+ @Test
+ public void testEmptyTable() {
+ BigQueryTableProvider provider = new BigQueryTableProvider();
+ Table table = getTable("testTable", bigQuery.tableSpec());
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+ BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+ assertNotNull(size);
+ assertEquals(BigInteger.ZERO, size.getRowCount());
+ }
+
+ @Test
+ public void testNonEmptyTable() {
+ BigQueryTableProvider provider = new BigQueryTableProvider();
+ Table table = getTable("testTable", bigQuery.tableSpec());
+
+ pipeline
+ .apply(
+ Create.of(
+ new TableRow().set("id", 1).set("name", "name1"),
+ new TableRow().set("id", 2).set("name", "name2"),
+ new TableRow().set("id", 3).set("name", "name3"))
+ .withCoder(TableRowJsonCoder.of()))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(bigQuery.tableSpec())
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("id").setType("INTEGER"),
+ new TableFieldSchema().setName("name").setType("STRING"))))
+ .withoutValidation());
+ pipeline.run().waitUntilFinish();
+
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+ BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+
+ assertNotNull(size1);
+ assertEquals(BigInteger.valueOf(3), size1.getRowCount());
+ }
+
+ /** This tests if the pipeline options are injected in the path of SQL Transform. */
+ @Test
+ public void testPipelineOptionInjection() {
+ BigQueryTestTableProvider provider = new BigQueryTestTableProvider();
+ Table table = getTable("testTable", bigQuery.tableSpec());
+ provider.addTable("testTable", table);
+
+ pipeline
+ .apply(
+ Create.of(
+ new TableRow().set("id", 1).set("name", "name1"),
+ new TableRow().set("id", 2).set("name", "name2"),
+ new TableRow().set("id", 3).set("name", "name3"))
+ .withCoder(TableRowJsonCoder.of()))
+ .apply(
+ BigQueryIO.writeTableRows()
+ .to(bigQuery.tableSpec())
+ .withSchema(
+ new TableSchema()
+ .setFields(
+ ImmutableList.of(
+ new TableFieldSchema().setName("id").setType("INTEGER"),
+ new TableFieldSchema().setName("name").setType("STRING"))))
+ .withoutValidation());
+ pipeline.run().waitUntilFinish();
+
+ // changing pipeline options
+ readingPipeline.getOptions().setJobName(FAKE_JOB_NAME);
+
+ // Reading from the table should update the statistics of bigQuery table
+ readingPipeline.apply(
+ SqlTransform.query(" select * from testTable ")
+ .withDefaultTableProvider("bigquery", provider));
+
+ readingPipeline.run().waitUntilFinish();
+
+ BigQueryTestTable sqlTable = (BigQueryTestTable) provider.buildBeamSqlTable(table);
+ assertEquals(FAKE_JOB_NAME, sqlTable.getJobName());
+ }
+
+ @Test
+ public void testFakeTable() {
+ BigQueryTableProvider provider = new BigQueryTableProvider();
+ Table table = getTable("fakeTable", "project:dataset.table");
+
+ BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
+ BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
+ assertTrue(size.isUnknown());
+ }
+
+ private static Table getTable(String name, String location) {
+ return Table.builder()
+ .name(name)
+ .comment(name + " table")
+ .location(location)
+ .schema(
+ Stream.of(Schema.Field.nullable("id", INT64), Schema.Field.nullable("name", STRING))
+ .collect(toSchema()))
+ .type("bigquery")
+ .build();
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
new file mode 100644
index 0000000..db954ae
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+/**
+ * A BigQueryTable that keeps jobName from the pipeline options whenever row count is called. It is
+ * made for {@link BigQueryRowCountIT#testPipelineOptionInjection()}
+ */
+public class BigQueryTestTable extends BigQueryTable {
+ private String jobName = null;
+
+ BigQueryTestTable(Table table, BigQueryUtils.ConversionOptions options) {
+ super(table, options);
+ }
+
+ @Override
+ public BeamRowCountStatistics getRowCount(PipelineOptions options) {
+ jobName = options.getJobName();
+ return super.getRowCount(options);
+ }
+
+ String getJobName() {
+ return this.jobName;
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
new file mode 100644
index 0000000..d8656ea
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
+
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+
+/** A test table provider for BigQueryRowCountIT. */
+public class BigQueryTestTableProvider extends BigQueryTableProvider {
+
+ private Map<String, Table> tableSpecMap;
+ private Map<String, BeamSqlTable> beamSqlTableMap;
+
+ BigQueryTestTableProvider() {
+ super();
+ tableSpecMap = new HashMap<>();
+ beamSqlTableMap = new HashMap<>();
+ }
+
+ void addTable(String name, Table table) {
+ tableSpecMap.put(name, table);
+ }
+
+ @Nullable
+ @Override
+ public Table getTable(String tableName) {
+ return tableSpecMap.get(tableName);
+ }
+
+ @Override
+ public BeamSqlTable buildBeamSqlTable(Table table) {
+ BeamSqlTable t = beamSqlTableMap.get(table.getLocation());
+ if (t != null) {
+ return t;
+ }
+
+ t =
+ new BigQueryTestTable(
+ table,
+ BigQueryUtils.ConversionOptions.builder()
+ .setTruncateTimestamps(
+ firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false)
+ ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE
+ : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT)
+ .build());
+ beamSqlTableMap.put(table.getLocation(), t);
+
+ return t;
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index e8fa135..1e89240 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -348,7 +348,7 @@ public class PubsubJsonIT implements Serializable {
inMemoryMetaStore.registerProvider(tableProvider);
}
- JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore);
+ JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options);
connection.setPipelineOptionsMap(argsMap);
return connection;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 82bb5ad..6c23708 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -25,12 +25,14 @@ import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -541,6 +543,27 @@ public class BigQueryHelpers {
}
}
+ /**
+ * It returns the number of rows for a given table.
+ *
+ * @param options
+ * @param tableRef
+ * @return The number of rows in the table.
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @Nullable
+ public static BigInteger getNumRows(BigQueryOptions options, TableReference tableRef)
+ throws InterruptedException, IOException {
+
+ DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options);
+ Table table = datasetService.getTable(tableRef);
+ if (table == null) {
+ return null;
+ }
+ return table.getNumRows();
+ }
+
static String getDatasetLocation(
DatasetService datasetService, String projectId, String datasetId) {
Dataset dataset;