You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by an...@apache.org on 2019/06/17 23:48:50 UTC
[beam] 01/01: Revert "[BEAM-7513] Adding Row Count for Bigquery
Table"
This is an automated email from the ASF dual-hosted git repository.
anton pushed a commit to branch revert-8822-bigquery-rowcount
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7b7c21544b50b1c8ac96f6c88362a0e266b6d55e
Author: Anton Kedin <33...@users.noreply.github.com>
AuthorDate: Mon Jun 17 16:48:30 2019 -0700
Revert "[BEAM-7513] Adding Row Count for Bigquery Table"
---
.../apache/beam/sdk/extensions/sql/BeamSqlCli.java | 7 +-
.../beam/sdk/extensions/sql/BeamSqlTable.java | 7 -
.../beam/sdk/extensions/sql/SqlTransform.java | 2 -
.../sql/impl/BeamCalciteSchemaFactory.java | 12 +-
.../sdk/extensions/sql/impl/BeamCalciteTable.java | 23 ---
.../sql/impl/BeamRowCountStatistics.java | 44 ------
.../beam/sdk/extensions/sql/impl/BeamSqlEnv.java | 26 +---
.../beam/sdk/extensions/sql/impl/JdbcDriver.java | 34 +----
.../sql/meta/provider/bigquery/BigQueryTable.java | 39 -----
.../sdk/extensions/sql/impl/BeamSqlEnvTest.java | 7 +-
.../sdk/extensions/sql/impl/JdbcDriverTest.java | 31 ++--
.../extensions/sql/impl/parser/BeamDDLTest.java | 7 +-
...BeamSqlBuiltinFunctionsIntegrationTestBase.java | 7 +-
.../meta/provider/bigquery/BigQueryRowCountIT.java | 161 ---------------------
.../meta/provider/bigquery/BigQueryTestTable.java | 45 ------
.../bigquery/BigQueryTestTableProvider.java | 71 ---------
.../sql/meta/provider/pubsub/PubsubJsonIT.java | 2 +-
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 23 ---
18 files changed, 29 insertions(+), 519 deletions(-)
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
index 8bdb1bf..5e44c6c 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlCli.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
/** {@link BeamSqlCli} provides methods to execute Beam SQL with an interactive client. */
@Experimental
@@ -35,17 +34,15 @@ public class BeamSqlCli {
private MetaStore metaStore;
public BeamSqlCli metaStore(MetaStore metaStore) {
- return metaStore(metaStore, false, PipelineOptionsFactory.create());
+ return metaStore(metaStore, false);
}
- public BeamSqlCli metaStore(
- MetaStore metaStore, boolean autoLoadUdfUdaf, PipelineOptions pipelineOptions) {
+ public BeamSqlCli metaStore(MetaStore metaStore, boolean autoLoadUdfUdaf) {
this.metaStore = metaStore;
BeamSqlEnv.BeamSqlEnvBuilder builder = BeamSqlEnv.builder(metaStore);
if (autoLoadUdfUdaf) {
builder.autoLoadUserDefinedFunctions();
}
- builder.setPipelineOptions(pipelineOptions);
this.env = builder.build();
return this;
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
index 63f7158..14f1b80 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlTable.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.extensions.sql;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -38,9 +36,4 @@ public interface BeamSqlTable {
/** Get the schema info of the table. */
Schema getSchema();
-
- /** Estimates the number of rows or returns null if there is no estimation. */
- default BeamRowCountStatistics getRowCount(PipelineOptions options) {
- return BeamRowCountStatistics.UNKNOWN;
- }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
index afa4438..e45daca 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/SqlTransform.java
@@ -118,8 +118,6 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>
sqlEnvBuilder.setQueryPlannerClassName(
input.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getPlannerName());
- sqlEnvBuilder.setPipelineOptions(input.getPipeline().getOptions());
-
BeamSqlEnv sqlEnv = sqlEnvBuilder.build();
return BeamSqlRelUtils.toPCollection(input.getPipeline(), sqlEnv.parseQuery(queryString()));
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
index 339810c..f6a016b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchemaFactory.java
@@ -51,8 +51,8 @@ import org.apache.calcite.schema.Table;
* a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection
* properties).
*
- * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider,
- * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers.
+ * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider)} to avoid
+ * loading all available table providers.
*/
class BeamCalciteSchemaFactory {
@@ -97,10 +97,10 @@ class BeamCalciteSchemaFactory {
}
/**
- * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider
- * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This
- * schema is expected to be replaced by an actual functional schema by the same code that
- * specified this override in the first place.
+ * This is the override to create an empty schema, used in {@link
+ * JdbcDriver#connect(TableProvider)} to avoid loading all table providers. This schema is
+ * expected to be replaced by an actual functional schema by the same code that specified this
+ * override in the first place.
*/
public static class Empty extends InitialEmptySchema implements SchemaFactory {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index e53cb04..e800c82 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -21,11 +21,9 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.impl.rel.BeamEnumerableConverter;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSinkRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.calcite.linq4j.QueryProvider;
@@ -40,8 +38,6 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.schema.ModifiableTable;
import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Statistic;
-import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.TranslatableTable;
/** Adapter from {@link BeamSqlTable} to a calcite Table. */
@@ -66,25 +62,6 @@ public class BeamCalciteTable extends AbstractQueryableTable
}
@Override
- public Statistic getStatistic() {
- /*
- Changing class loader is required for the JDBC path. It is similar to what done in
- {@link BeamEnumerableConverter#toRowList} and {@link BeamEnumerableConverter#toEnumerable }.
- */
- final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- Thread.currentThread().setContextClassLoader(BeamEnumerableConverter.class.getClassLoader());
- BeamRowCountStatistics beamStatistics =
- beamTable.getRowCount(BeamEnumerableConverter.createPipelineOptions(pipelineOptions));
- return beamStatistics.isUnknown()
- ? Statistics.UNKNOWN
- : Statistics.of(beamStatistics.getRowCount().doubleValue(), ImmutableList.of());
- } finally {
- Thread.currentThread().setContextClassLoader(originalClassLoader);
- }
- }
-
- @Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
return new BeamIOSourceRel(context.getCluster(), relOptTable, beamTable, pipelineOptions);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
deleted file mode 100644
index ac0431d..0000000
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamRowCountStatistics.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-
-/** This class stores row count statistics. */
-public class BeamRowCountStatistics implements Serializable {
- public static final BeamRowCountStatistics UNKNOWN = new BeamRowCountStatistics(null);
- private final BigInteger rowCount;
-
- private BeamRowCountStatistics(BigInteger rowCount) {
- this.rowCount = rowCount;
- }
-
- public static BeamRowCountStatistics createBoundedTableStatistics(BigInteger rowCount) {
- return new BeamRowCountStatistics(rowCount);
- }
-
- /** Is true if the row count cannot be estimated. */
- public boolean isUnknown() {
- return rowCount == null;
- }
-
- public BigInteger getRowCount() {
- return rowCount;
- }
-}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
index ae4238d..02b3e69 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnv.java
@@ -38,8 +38,6 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.UdfUdafProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
@@ -68,26 +66,14 @@ public class BeamSqlEnv {
return new BeamSqlEnvBuilder(tableProvider);
}
- /**
- * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
- * Pipeline Options. It should only be used in tests.
- */
public static BeamSqlEnv readOnly(String tableType, Map<String, BeamSqlTable> tables) {
return withTableProvider(new ReadOnlyTableProvider(tableType, tables));
}
- /**
- * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty
- * Pipeline Options. It should only be used in tests.
- */
public static BeamSqlEnv withTableProvider(TableProvider tableProvider) {
- return builder(tableProvider).setPipelineOptions(PipelineOptionsFactory.create()).build();
+ return builder(tableProvider).build();
}
- /**
- * This method creates {@link org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv} using empty *
- * Pipeline Options. It should only be used in tests.
- */
public static BeamSqlEnv inMemory(TableProvider... tableProviders) {
InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
for (TableProvider tableProvider : tableProviders) {
@@ -137,7 +123,6 @@ public class BeamSqlEnv {
private Set<Map.Entry<String, Function>> functionSet;
private boolean autoLoadBuiltinFunctions;
private boolean autoLoadUdfs;
- private PipelineOptions pipelineOptions;
private BeamSqlEnvBuilder(TableProvider tableProvider) {
checkNotNull(tableProvider, "Table provider for the default schema must be sets.");
@@ -148,7 +133,6 @@ public class BeamSqlEnv {
functionSet = new HashSet<>();
autoLoadUdfs = false;
autoLoadBuiltinFunctions = false;
- pipelineOptions = null;
}
/** Add a top-level schema backed by the table provider. */
@@ -210,20 +194,14 @@ public class BeamSqlEnv {
return this;
}
- public BeamSqlEnvBuilder setPipelineOptions(PipelineOptions pipelineOptions) {
- this.pipelineOptions = pipelineOptions;
- return this;
- }
-
/**
* Build function to create an instance of BeamSqlEnv based on preset fields.
*
* @return BeamSqlEnv.
*/
public BeamSqlEnv build() {
- checkNotNull(pipelineOptions);
- JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider, pipelineOptions);
+ JdbcConnection jdbcConnection = JdbcDriver.connect(defaultTableProvider);
configureSchemas(jdbcConnection);
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
index 4dfabb8..bb7cc42 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriver.java
@@ -20,14 +20,11 @@ package org.apache.beam.sdk.extensions.sql.impl;
import static org.apache.calcite.config.CalciteConnectionProperty.SCHEMA_FACTORY;
import static org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -103,8 +100,6 @@ public class JdbcDriver extends Driver {
INSTANCE.register();
}
- public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
@Override
protected AvaticaFactory createFactory() {
return JdbcFactory.wrap((CalciteFactory) super.createFactory());
@@ -146,7 +141,7 @@ public class JdbcDriver extends Driver {
* not this path. The CLI ends up using the schema factory that populates the default schema with
* all table providers it can find. See {@link BeamCalciteSchemaFactory}.
*/
- public static JdbcConnection connect(TableProvider tableProvider, PipelineOptions options) {
+ public static JdbcConnection connect(TableProvider tableProvider) {
try {
Properties properties = new Properties();
properties.setProperty(
@@ -154,36 +149,9 @@ public class JdbcDriver extends Driver {
JdbcConnection connection =
(JdbcConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, properties);
connection.setSchema(TOP_LEVEL_BEAM_SCHEMA, tableProvider);
- connection.setPipelineOptionsMap(getOptionsMap(options));
return connection;
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
-
- /** Converts {@link PipelineOptions} to its map format. */
- private static Map<String, String> getOptionsMap(PipelineOptions options) {
- Map map = OBJECT_MAPPER.convertValue(options, Map.class);
-
- map = (Map) map.get("options");
- if (map == null) {
- map = new HashMap();
- }
-
- Map<String, String> optionMap = new HashMap<>();
- for (Object entry : map.entrySet()) {
- Map.Entry ent = (Map.Entry) entry;
- String value;
- try {
- value =
- (ent.getValue() instanceof String)
- ? ent.getValue().toString()
- : OBJECT_MAPPER.writeValueAsString(ent.getValue());
- } catch (Exception e) {
- throw new IllegalArgumentException("Unable to parse Pipeline Options", e);
- }
- optionMap.put(ent.getKey().toString(), value);
- }
- return optionMap;
- }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 222e4cf..621f149 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -17,27 +17,19 @@
*/
package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-import java.io.IOException;
import java.io.Serializable;
-import java.math.BigInteger;
import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@code BigQueryTable} represent a BigQuery table as a target. This provider does not currently
@@ -47,8 +39,6 @@ import org.slf4j.LoggerFactory;
class BigQueryTable extends BaseBeamTable implements Serializable {
@VisibleForTesting final String bqLocation;
private final ConversionOptions conversionOptions;
- private BeamRowCountStatistics rowCountStatistics = null;
- private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTable.class);
BigQueryTable(Table table, BigQueryUtils.ConversionOptions options) {
super(table.getSchema());
@@ -57,16 +47,6 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
}
@Override
- public BeamRowCountStatistics getRowCount(PipelineOptions options) {
-
- if (rowCountStatistics == null) {
- rowCountStatistics = getRowCountFromBQ(options, bqLocation);
- }
-
- return rowCountStatistics;
- }
-
- @Override
public PCollection.IsBounded isBounded() {
return PCollection.IsBounded.BOUNDED;
}
@@ -92,23 +72,4 @@ class BigQueryTable extends BaseBeamTable implements Serializable {
.withFormatFunction(BigQueryUtils.toTableRow())
.to(bqLocation));
}
-
- private static BeamRowCountStatistics getRowCountFromBQ(PipelineOptions o, String bqLocation) {
- try {
- BigInteger rowCount =
- BigQueryHelpers.getNumRows(
- o.as(BigQueryOptions.class), BigQueryHelpers.parseTableSpec(bqLocation));
-
- if (rowCount == null) {
- return BeamRowCountStatistics.UNKNOWN;
- }
-
- return BeamRowCountStatistics.createBoundedTableStatistics(rowCount);
-
- } catch (IOException | InterruptedException e) {
- LOGGER.warn("Could not get the row count for the table " + bqLocation, e);
- }
-
- return BeamRowCountStatistics.UNKNOWN;
- }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
index 3b6dda0..517309d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/BeamSqlEnvTest.java
@@ -24,7 +24,6 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import java.sql.Connection;
import java.sql.ResultSet;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -43,7 +42,6 @@ public class BeamSqlEnvTest {
BeamSqlEnv.builder(root)
.addSchema("nested", nested)
.addSchema("anotherOne", anotherOne)
- .setPipelineOptions(PipelineOptionsFactory.create())
.build();
Connection connection = env.connection;
@@ -62,9 +60,6 @@ public class BeamSqlEnvTest {
exceptions.expectCause(hasMessage(containsString("org.test.ClassNotFound")));
TestTableProvider root = new TestTableProvider();
- BeamSqlEnv.builder(root)
- .setQueryPlannerClassName("org.test.ClassNotFound")
- .setPipelineOptions(PipelineOptionsFactory.create())
- .build();
+ BeamSqlEnv.builder(root).setQueryPlannerClassName("org.test.ClassNotFound").build();
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
index 3272d00..6f36173 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.Row;
@@ -198,7 +197,7 @@ public class JdbcDriverTest {
@Test
public void testSelectsFromExistingTable() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ Connection connection = JdbcDriver.connect(tableProvider);
connection
.createStatement()
@@ -220,7 +219,7 @@ public class JdbcDriverTest {
@Test
public void testTimestampWithDefaultTimezone() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ Connection connection = JdbcDriver.connect(tableProvider);
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -251,7 +250,7 @@ public class JdbcDriverTest {
public void testTimestampWithNonzeroTimezone() throws Exception {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT);
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ Connection connection = JdbcDriver.connect(tableProvider);
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -281,7 +280,7 @@ public class JdbcDriverTest {
public void testTimestampWithZeroTimezone() throws Exception {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ Connection connection = JdbcDriver.connect(tableProvider);
// A table with one TIMESTAMP column
Schema schema = Schema.builder().addDateTimeField("ts").build();
@@ -310,7 +309,7 @@ public class JdbcDriverTest {
@Test
public void testSelectsFromExistingComplexTable() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ Connection connection = JdbcDriver.connect(tableProvider);
connection
.createStatement()
@@ -344,7 +343,7 @@ public class JdbcDriverTest {
@Test
public void testInsertIntoCreatedTable() throws Exception {
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ Connection connection = JdbcDriver.connect(tableProvider);
connection
.createStatement()
@@ -370,8 +369,7 @@ public class JdbcDriverTest {
@Test
public void testInternalConnect_boundedTable() throws Exception {
- CalciteConnection connection =
- JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM test");
assertTrue(resultSet.next());
@@ -394,8 +392,7 @@ public class JdbcDriverTest {
.addRows(1, "second first")
.addRows(2, "second")));
- CalciteConnection connection =
- JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ CalciteConnection connection = JdbcDriver.connect(tableProvider);
Statement statement = connection.createStatement();
ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 5");
assertTrue(resultSet1.next());
@@ -435,8 +432,7 @@ public class JdbcDriverTest {
.timestampColumnIndex(3)
.addRows(Duration.ZERO, 1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE)));
- CalciteConnection connection =
- JdbcDriver.connect(tableProvider, PipelineOptionsFactory.create());
+ CalciteConnection connection = JdbcDriver.connect(tableProvider);
Statement statement = connection.createStatement();
ResultSet resultSet1 = statement.executeQuery("SELECT * FROM test LIMIT 1");
@@ -474,8 +470,7 @@ public class JdbcDriverTest {
@Test
public void testInternalConnect_setDirectRunner() throws Exception {
- CalciteConnection connection =
- JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
Statement statement = connection.createStatement();
assertEquals(0, statement.executeUpdate("SET runner = direct"));
assertTrue(statement.execute("SELECT * FROM test"));
@@ -485,8 +480,7 @@ public class JdbcDriverTest {
public void testInternalConnect_setBogusRunner() throws Exception {
thrown.expectMessage("Unknown 'runner' specified 'bogus'");
- CalciteConnection connection =
- JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
Statement statement = connection.createStatement();
assertEquals(0, statement.executeUpdate("SET runner = bogus"));
assertTrue(statement.execute("SELECT * FROM test"));
@@ -494,8 +488,7 @@ public class JdbcDriverTest {
@Test
public void testInternalConnect_resetAll() throws Exception {
- CalciteConnection connection =
- JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create());
+ CalciteConnection connection = JdbcDriver.connect(BOUNDED_TABLE);
Statement statement = connection.createStatement();
assertEquals(0, statement.executeUpdate("SET runner = bogus"));
assertEquals(0, statement.executeUpdate("RESET ALL"));
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
index 5d6f460..b7f4215 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/parser/BeamDDLTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.junit.Test;
@@ -168,11 +167,7 @@ public class BeamDDLTest {
TestTableProvider rootProvider = new TestTableProvider();
TestTableProvider testProvider = new TestTableProvider();
- BeamSqlEnv env =
- BeamSqlEnv.builder(rootProvider)
- .addSchema("test", testProvider)
- .setPipelineOptions(PipelineOptionsFactory.create())
- .build();
+ BeamSqlEnv env = BeamSqlEnv.builder(rootProvider).addSchema("test", testProvider).build();
assertNull(testProvider.getTables().get("person"));
env.executeDdl("CREATE EXTERNAL TABLE test.person (id INT) TYPE text");
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
index 638c20f..22430ba 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/integrationtest/BeamSqlBuiltinFunctionsIntegrationTestBase.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
@@ -332,7 +331,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
*/
public void check(Pipeline pipeline) throws Exception {
checkPTransform(pipeline);
- checkJdbc(pipeline.getOptions());
+ checkJdbc();
}
private static final Schema DUMMY_SCHEMA = Schema.builder().addBooleanField("dummy").build();
@@ -354,7 +353,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
Schema.FieldType.STRING, "name")
.addRows(1, "first")));
- private void checkJdbc(PipelineOptions pipelineOptions) throws Exception {
+ private void checkJdbc() throws Exception {
// Beam SQL code is only invoked when the calling convention insists on it, so we
// have to express this as selecting from a Beam table, even though the contents are
// irrelevant.
@@ -364,7 +363,7 @@ public class BeamSqlBuiltinFunctionsIntegrationTestBase {
//
// Here we create a Beam table just to force the calling convention.
TestTableProvider tableProvider = new TestTableProvider();
- Connection connection = JdbcDriver.connect(tableProvider, pipelineOptions);
+ Connection connection = JdbcDriver.connect(tableProvider);
connection
.createStatement()
.executeUpdate("CREATE EXTERNAL TABLE dummy (dummy BOOLEAN) TYPE 'test'");
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
deleted file mode 100644
index 23d09fc..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryRowCountIT.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-
-import static org.apache.beam.sdk.schemas.Schema.FieldType.INT64;
-import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import java.math.BigInteger;
-import java.util.stream.Stream;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.SqlTransform;
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
-import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
-import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Integration tests form writing to BigQuery with Beam SQL. */
-@RunWith(JUnit4.class)
-public class BigQueryRowCountIT {
- private static final Schema SOURCE_SCHEMA =
- Schema.builder().addNullableField("id", INT64).addNullableField("name", STRING).build();
- private static final String FAKE_JOB_NAME = "testPipelineOptionInjectionFakeJobName";
-
- @Rule public transient TestPipeline pipeline = TestPipeline.create();
- @Rule public transient TestPipeline readingPipeline = TestPipeline.create();
- @Rule public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA);
-
- @Test
- public void testEmptyTable() {
- BigQueryTableProvider provider = new BigQueryTableProvider();
- Table table = getTable("testTable", bigQuery.tableSpec());
- BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
- BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
- assertNotNull(size);
- assertEquals(BigInteger.ZERO, size.getRowCount());
- }
-
- @Test
- public void testNonEmptyTable() {
- BigQueryTableProvider provider = new BigQueryTableProvider();
- Table table = getTable("testTable", bigQuery.tableSpec());
-
- pipeline
- .apply(
- Create.of(
- new TableRow().set("id", 1).set("name", "name1"),
- new TableRow().set("id", 2).set("name", "name2"),
- new TableRow().set("id", 3).set("name", "name3"))
- .withCoder(TableRowJsonCoder.of()))
- .apply(
- BigQueryIO.writeTableRows()
- .to(bigQuery.tableSpec())
- .withSchema(
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new TableFieldSchema().setName("id").setType("INTEGER"),
- new TableFieldSchema().setName("name").setType("STRING"))))
- .withoutValidation());
- pipeline.run().waitUntilFinish();
-
- BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
- BeamRowCountStatistics size1 = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
-
- assertNotNull(size1);
- assertEquals(BigInteger.valueOf(3), size1.getRowCount());
- }
-
- /** This tests if the pipeline options are injected in the path of SQL Transform. */
- @Test
- public void testPipelineOptionInjection() {
- BigQueryTestTableProvider provider = new BigQueryTestTableProvider();
- Table table = getTable("testTable", bigQuery.tableSpec());
- provider.addTable("testTable", table);
-
- pipeline
- .apply(
- Create.of(
- new TableRow().set("id", 1).set("name", "name1"),
- new TableRow().set("id", 2).set("name", "name2"),
- new TableRow().set("id", 3).set("name", "name3"))
- .withCoder(TableRowJsonCoder.of()))
- .apply(
- BigQueryIO.writeTableRows()
- .to(bigQuery.tableSpec())
- .withSchema(
- new TableSchema()
- .setFields(
- ImmutableList.of(
- new TableFieldSchema().setName("id").setType("INTEGER"),
- new TableFieldSchema().setName("name").setType("STRING"))))
- .withoutValidation());
- pipeline.run().waitUntilFinish();
-
- // changing pipeline options
- readingPipeline.getOptions().setJobName(FAKE_JOB_NAME);
-
- // Reading from the table should update the statistics of bigQuery table
- readingPipeline.apply(
- SqlTransform.query(" select * from testTable ")
- .withDefaultTableProvider("bigquery", provider));
-
- readingPipeline.run().waitUntilFinish();
-
- BigQueryTestTable sqlTable = (BigQueryTestTable) provider.buildBeamSqlTable(table);
- assertEquals(FAKE_JOB_NAME, sqlTable.getJobName());
- }
-
- @Test
- public void testFakeTable() {
- BigQueryTableProvider provider = new BigQueryTableProvider();
- Table table = getTable("fakeTable", "project:dataset.table");
-
- BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
- BeamRowCountStatistics size = sqlTable.getRowCount(TestPipeline.testingPipelineOptions());
- assertTrue(size.isUnknown());
- }
-
- private static Table getTable(String name, String location) {
- return Table.builder()
- .name(name)
- .comment(name + " table")
- .location(location)
- .schema(
- Stream.of(Schema.Field.nullable("id", INT64), Schema.Field.nullable("name", STRING))
- .collect(toSchema()))
- .type("bigquery")
- .build();
- }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
deleted file mode 100644
index db954ae..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTable.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-
-import org.apache.beam.sdk.extensions.sql.impl.BeamRowCountStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-import org.apache.beam.sdk.options.PipelineOptions;
-
-/**
- * A BigQueryTable that keeps jobName from the pipeline options whenever row count is called. It is
- * made for {@link BigQueryRowCountIT#testPipelineOptionInjection()}
- */
-public class BigQueryTestTable extends BigQueryTable {
- private String jobName = null;
-
- BigQueryTestTable(Table table, BigQueryUtils.ConversionOptions options) {
- super(table, options);
- }
-
- @Override
- public BeamRowCountStatistics getRowCount(PipelineOptions options) {
- jobName = options.getJobName();
- return super.getRowCount(options);
- }
-
- String getJobName() {
- return this.jobName;
- }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
deleted file mode 100644
index d8656ea..0000000
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTestTableProvider.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.bigquery;
-
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
-
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
-
-/** A test table provider for BigQueryRowCountIT. */
-public class BigQueryTestTableProvider extends BigQueryTableProvider {
-
- private Map<String, Table> tableSpecMap;
- private Map<String, BeamSqlTable> beamSqlTableMap;
-
- BigQueryTestTableProvider() {
- super();
- tableSpecMap = new HashMap<>();
- beamSqlTableMap = new HashMap<>();
- }
-
- void addTable(String name, Table table) {
- tableSpecMap.put(name, table);
- }
-
- @Nullable
- @Override
- public Table getTable(String tableName) {
- return tableSpecMap.get(tableName);
- }
-
- @Override
- public BeamSqlTable buildBeamSqlTable(Table table) {
- BeamSqlTable t = beamSqlTableMap.get(table.getLocation());
- if (t != null) {
- return t;
- }
-
- t =
- new BigQueryTestTable(
- table,
- BigQueryUtils.ConversionOptions.builder()
- .setTruncateTimestamps(
- firstNonNull(table.getProperties().getBoolean("truncateTimestamps"), false)
- ? BigQueryUtils.ConversionOptions.TruncateTimestamps.TRUNCATE
- : BigQueryUtils.ConversionOptions.TruncateTimestamps.REJECT)
- .build());
- beamSqlTableMap.put(table.getLocation(), t);
-
- return t;
- }
-}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index 1e89240..e8fa135 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -348,7 +348,7 @@ public class PubsubJsonIT implements Serializable {
inMemoryMetaStore.registerProvider(tableProvider);
}
- JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore, options);
+ JdbcConnection connection = JdbcDriver.connect(inMemoryMetaStore);
connection.setPipelineOptionsMap(argsMap);
return connection;
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 6c23708..82bb5ad 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -25,14 +25,12 @@ import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -543,27 +541,6 @@ public class BigQueryHelpers {
}
}
- /**
- * It returns the number of rows for a given table.
- *
- * @param options
- * @param tableRef
- * @return The number of rows in the table.
- * @throws InterruptedException
- * @throws IOException
- */
- @Nullable
- public static BigInteger getNumRows(BigQueryOptions options, TableReference tableRef)
- throws InterruptedException, IOException {
-
- DatasetService datasetService = new BigQueryServicesImpl().getDatasetService(options);
- Table table = datasetService.getTable(tableRef);
- if (table == null) {
- return null;
- }
- return table.getNumRows();
- }
-
static String getDatasetLocation(
DatasetService datasetService, String projectId, String datasetId) {
Dataset dataset;