You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/18 01:35:26 UTC
[beam] branch release-2.40.0 updated: BigQueryIO: Adding the BASIC view setting to getTable request (#21879)
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch release-2.40.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.40.0 by this push:
new 91ae5bebe55 BigQueryIO: Adding the BASIC view setting to getTable request (#21879)
new 6afcb5bbc71 Merge pull request #21938 from [cherry-pick][release-2.40.0] BigQueryIO: Adding the BASIC view setting to getTable request (#21879)
91ae5bebe55 is described below
commit 91ae5bebe559ccdf45ad00d911259534c12931e3
Author: pablo rodriguez defino <pr...@gmail.com>
AuthorDate: Fri Jun 17 16:50:43 2022 -0700
BigQueryIO: Adding the BASIC view setting to getTable request (#21879)
* Adding the BASIC view to the getTable request to avoid storage related calculations on the backend (caused by the default option)
* adding new method for getTable using view param to make the change backwards compatible
* spotless apply
* changed create table ptransform to use BASIC view as well
* replaced couple more occurrences that can take advantage of just requesting BASIC view
* addressing comments from review
* fixed flaky test
* ran spotless apply
---
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 5 ++++-
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 14 ++++++++++++++
.../beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 18 ++++++++++++++++--
.../beam/sdk/io/gcp/bigquery/CreateTableHelpers.java | 4 +++-
.../beam/sdk/io/gcp/bigquery/TableSchemaCache.java | 15 +++++++++++++--
.../beam/sdk/io/gcp/testing/FakeDatasetService.java | 17 +++++++++++++++++
.../sdk/io/gcp/bigquery/BigQueryServicesImplTest.java | 6 +++---
.../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java | 2 ++
8 files changed, 72 insertions(+), 9 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 3a37a36e7ee..0a62245390b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -33,6 +33,7 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -516,7 +517,9 @@ public class BigQueryHelpers {
static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
try {
- if (datasetService.getTable(tableRef) != null) {
+ if (datasetService.getTable(
+ tableRef, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+ != null) {
checkState(
datasetService.isTableEmpty(tableRef),
"BigQuery table is not empty: %s.",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dd4da2bd7fc..a0484ccf972 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -111,6 +111,16 @@ public interface BigQueryServices extends Serializable {
/** An interface to get, create and delete Cloud BigQuery datasets and tables. */
public interface DatasetService extends AutoCloseable {
+
+ // maps the values at
+ // https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView
+ enum TableMetadataView {
+ TABLE_METADATA_VIEW_UNSPECIFIED,
+ BASIC,
+ STORAGE_STATS,
+ FULL;
+ };
+
/**
* Gets the specified {@link Table} resource by table ID.
*
@@ -123,6 +133,10 @@ public interface BigQueryServices extends Serializable {
Table getTable(TableReference tableRef, List<String> selectedFields)
throws InterruptedException, IOException;
+ @Nullable
+ Table getTable(TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+ throws InterruptedException, IOException;
+
/** Creates the specified table if it does not exist. */
void createTable(Table table) throws InterruptedException, IOException;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 50f3988c1c1..a50947a4915 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -616,13 +616,24 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public @Nullable Table getTable(TableReference tableRef, List<String> selectedFields)
throws IOException, InterruptedException {
- return getTable(tableRef, selectedFields, createDefaultBackoff(), Sleeper.DEFAULT);
+ return getTable(tableRef, selectedFields, TableMetadataView.STORAGE_STATS);
+ }
+
+ @Override
+ public @Nullable Table getTable(
+ TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+ throws IOException, InterruptedException {
+ return getTable(tableRef, selectedFields, view, createDefaultBackoff(), Sleeper.DEFAULT);
}
@VisibleForTesting
@Nullable
Table getTable(
- TableReference ref, List<String> selectedFields, BackOff backoff, Sleeper sleeper)
+ TableReference ref,
+ List<String> selectedFields,
+ TableMetadataView view,
+ BackOff backoff,
+ Sleeper sleeper)
throws IOException, InterruptedException {
Tables.Get get =
client
@@ -632,6 +643,9 @@ class BigQueryServicesImpl implements BigQueryServices {
if (!selectedFields.isEmpty()) {
get.setSelectedFields(String.join(",", selectedFields));
}
+ if (view != null) {
+ get.set("view", view.name());
+ }
try {
return executeWithRetries(
get,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index 403554b263c..426dfbb165e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -112,7 +112,9 @@ public class CreateTableHelpers {
tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
try (DatasetService datasetService =
bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) {
- if (datasetService.getTable(tableReference) == null) {
+ if (datasetService.getTable(
+ tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+ == null) {
TableSchema tableSchema = schemaSupplier.get();
Preconditions.checkArgumentNotNull(
tableSchema,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index c7fca47a287..ba6063def1c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -166,7 +167,12 @@ public class TableSchemaCache {
if (!schemaHolder.isPresent()) {
// Not initialized. Query the new schema with the monitor released and then update the cache.
try {
- @Nullable Table table = datasetService.getTable(tableReference);
+ // requesting the BASIC view will prevent BQ backend to run calculations
+ // related with storage stats that are not needed here
+ @Nullable
+ Table table =
+ datasetService.getTable(
+ tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
schemaHolder =
Optional.ofNullable((table == null) ? null : SchemaHolder.of(table.getSchema(), 0));
} catch (Exception e) {
@@ -298,7 +304,12 @@ public class TableSchemaCache {
Map<String, TableSchema> schemas = Maps.newHashMapWithExpectedSize(tables.size());
for (Map.Entry<String, Refresh> entry : tables.entrySet()) {
TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
- Table table = entry.getValue().getDatasetService().getTable(tableReference);
+ Table table =
+ entry
+ .getValue()
+ .getDatasetService()
+ .getTable(
+ tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
if (table == null) {
throw new RuntimeException("Did not get value for table " + tableReference);
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 1b4d03ddd01..213d6498a0e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -173,6 +173,23 @@ public class FakeDatasetService implements DatasetService, Serializable {
@Override
public Table getTable(TableReference tableRef, @Nullable List<String> selectedFields)
throws InterruptedException, IOException {
+ return getTable(tableRef, selectedFields, null);
+ }
+
+ @Override
+ public Table getTable(
+ TableReference tableRef,
+ @Nullable List<String> selectedFields,
+ @Nullable TableMetadataView view)
+ throws InterruptedException, IOException {
+ return getTableImpl(tableRef, selectedFields, view);
+ }
+
+ public Table getTableImpl(
+ TableReference tableRef,
+ @Nullable List<String> selectedFields,
+ @Nullable TableMetadataView view)
+ throws InterruptedException, IOException {
synchronized (FakeDatasetService.class) {
Map<String, TableContainer> dataset =
tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 6007d6a11bd..a57299b0a0d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -474,7 +474,7 @@ public class BigQueryServicesImplTest {
Table table =
datasetService.getTable(
- tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+ tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
assertEquals(testTable, table);
verifyAllResponsesAreRead();
@@ -499,7 +499,7 @@ public class BigQueryServicesImplTest {
.setTableId("tableId");
Table table =
datasetService.getTable(
- tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+ tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
assertNull(table);
verifyAllResponsesAreRead();
@@ -526,7 +526,7 @@ public class BigQueryServicesImplTest {
new BigQueryServicesImpl.DatasetServiceImpl(
bigquery, null, PipelineOptionsFactory.create());
datasetService.getTable(
- tableRef, Collections.emptyList(), BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+ tableRef, Collections.emptyList(), null, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
}
@Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 3895c8c315c..cef93913401 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -125,6 +125,7 @@ public class BigQueryUtilTest {
when(mockClient.tables()).thenReturn(mockTables);
when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
when(mockTablesGet.setPrettyPrint(false)).thenReturn(mockTablesGet);
+ when(mockTablesGet.set(anyString(), anyString())).thenReturn(mockTablesGet);
when(mockTablesGet.execute()).thenReturn(table);
}
@@ -132,6 +133,7 @@ public class BigQueryUtilTest {
verify(mockClient).tables();
verify(mockTables).get("project", "dataset", "table");
verify(mockTablesGet, atLeastOnce()).setPrettyPrint(false);
+ verify(mockTablesGet, atLeastOnce()).set(anyString(), anyString());
verify(mockTablesGet, atLeastOnce()).execute();
}