You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2022/06/18 01:35:26 UTC

[beam] branch release-2.40.0 updated: BigQueryIO: Adding the BASIC view setting to getTable request (#21879)

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch release-2.40.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.40.0 by this push:
     new 91ae5bebe55 BigQueryIO: Adding the BASIC view setting to getTable request  (#21879)
     new 6afcb5bbc71 Merge pull request #21938 from [cherry-pick][release-2.40.0] BigQueryIO: Adding the BASIC view setting to getTable request  (#21879)
91ae5bebe55 is described below

commit 91ae5bebe559ccdf45ad00d911259534c12931e3
Author: pablo rodriguez defino <pr...@gmail.com>
AuthorDate: Fri Jun 17 16:50:43 2022 -0700

    BigQueryIO: Adding the BASIC view setting to getTable request  (#21879)
    
    * Adding the BASIC view to the getTable request to avoid storage related calculations on the backend (caused by the default option)
    
    * adding new method for getTable using view param to make the change backwards compatible
    
    * spotless apply
    
    * changed create table ptransform to use BASIC view as well
    
    * replaced couple more occurrences that can take advantage of just requesting BASIC view
    
    * addressing comments from review
    
    * fixed flaky test
    
    * ran spotless apply
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java      |  5 ++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java     | 14 ++++++++++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 18 ++++++++++++++++--
 .../beam/sdk/io/gcp/bigquery/CreateTableHelpers.java   |  4 +++-
 .../beam/sdk/io/gcp/bigquery/TableSchemaCache.java     | 15 +++++++++++++--
 .../beam/sdk/io/gcp/testing/FakeDatasetService.java    | 17 +++++++++++++++++
 .../sdk/io/gcp/bigquery/BigQueryServicesImplTest.java  |  6 +++---
 .../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java     |  2 ++
 8 files changed, 72 insertions(+), 9 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 3a37a36e7ee..0a62245390b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -33,6 +33,7 @@ import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -516,7 +517,9 @@ public class BigQueryHelpers {
 
   static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
     try {
-      if (datasetService.getTable(tableRef) != null) {
+      if (datasetService.getTable(
+              tableRef, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+          != null) {
         checkState(
             datasetService.isTableEmpty(tableRef),
             "BigQuery table is not empty: %s.",
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dd4da2bd7fc..a0484ccf972 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -111,6 +111,16 @@ public interface BigQueryServices extends Serializable {
 
   /** An interface to get, create and delete Cloud BigQuery datasets and tables. */
   public interface DatasetService extends AutoCloseable {
+
+    // maps the values at
+    // https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get#TableMetadataView
+    enum TableMetadataView {
+      TABLE_METADATA_VIEW_UNSPECIFIED,
+      BASIC,
+      STORAGE_STATS,
+      FULL;
+    };
+
     /**
      * Gets the specified {@link Table} resource by table ID.
      *
@@ -123,6 +133,10 @@ public interface BigQueryServices extends Serializable {
     Table getTable(TableReference tableRef, List<String> selectedFields)
         throws InterruptedException, IOException;
 
+    @Nullable
+    Table getTable(TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+        throws InterruptedException, IOException;
+
     /** Creates the specified table if it does not exist. */
     void createTable(Table table) throws InterruptedException, IOException;
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 50f3988c1c1..a50947a4915 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -616,13 +616,24 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public @Nullable Table getTable(TableReference tableRef, List<String> selectedFields)
         throws IOException, InterruptedException {
-      return getTable(tableRef, selectedFields, createDefaultBackoff(), Sleeper.DEFAULT);
+      return getTable(tableRef, selectedFields, TableMetadataView.STORAGE_STATS);
+    }
+
+    @Override
+    public @Nullable Table getTable(
+        TableReference tableRef, List<String> selectedFields, TableMetadataView view)
+        throws IOException, InterruptedException {
+      return getTable(tableRef, selectedFields, view, createDefaultBackoff(), Sleeper.DEFAULT);
     }
 
     @VisibleForTesting
     @Nullable
     Table getTable(
-        TableReference ref, List<String> selectedFields, BackOff backoff, Sleeper sleeper)
+        TableReference ref,
+        List<String> selectedFields,
+        TableMetadataView view,
+        BackOff backoff,
+        Sleeper sleeper)
         throws IOException, InterruptedException {
       Tables.Get get =
           client
@@ -632,6 +643,9 @@ class BigQueryServicesImpl implements BigQueryServices {
       if (!selectedFields.isEmpty()) {
         get.setSelectedFields(String.join(",", selectedFields));
       }
+      if (view != null) {
+        get.set("view", view.name());
+      }
       try {
         return executeWithRetries(
             get,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
index 403554b263c..426dfbb165e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/CreateTableHelpers.java
@@ -112,7 +112,9 @@ public class CreateTableHelpers {
     tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
     try (DatasetService datasetService =
         bqServices.getDatasetService(context.getPipelineOptions().as(BigQueryOptions.class))) {
-      if (datasetService.getTable(tableReference) == null) {
+      if (datasetService.getTable(
+              tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC)
+          == null) {
         TableSchema tableSchema = schemaSupplier.get();
         Preconditions.checkArgumentNotNull(
             tableSchema,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
index c7fca47a287..ba6063def1c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableSchemaCache.java
@@ -22,6 +22,7 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
@@ -166,7 +167,12 @@ public class TableSchemaCache {
     if (!schemaHolder.isPresent()) {
       // Not initialized. Query the new schema with the monitor released and then update the cache.
       try {
-        @Nullable Table table = datasetService.getTable(tableReference);
+        // requesting the BASIC view will prevent BQ backend to run calculations
+        // related with storage stats that are not needed here
+        @Nullable
+        Table table =
+            datasetService.getTable(
+                tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
         schemaHolder =
             Optional.ofNullable((table == null) ? null : SchemaHolder.of(table.getSchema(), 0));
       } catch (Exception e) {
@@ -298,7 +304,12 @@ public class TableSchemaCache {
     Map<String, TableSchema> schemas = Maps.newHashMapWithExpectedSize(tables.size());
     for (Map.Entry<String, Refresh> entry : tables.entrySet()) {
       TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey());
-      Table table = entry.getValue().getDatasetService().getTable(tableReference);
+      Table table =
+          entry
+              .getValue()
+              .getDatasetService()
+              .getTable(
+                  tableReference, Collections.emptyList(), DatasetService.TableMetadataView.BASIC);
       if (table == null) {
         throw new RuntimeException("Did not get value for table " + tableReference);
       }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
index 1b4d03ddd01..213d6498a0e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/testing/FakeDatasetService.java
@@ -173,6 +173,23 @@ public class FakeDatasetService implements DatasetService, Serializable {
   @Override
   public Table getTable(TableReference tableRef, @Nullable List<String> selectedFields)
       throws InterruptedException, IOException {
+    return getTable(tableRef, selectedFields, null);
+  }
+
+  @Override
+  public Table getTable(
+      TableReference tableRef,
+      @Nullable List<String> selectedFields,
+      @Nullable TableMetadataView view)
+      throws InterruptedException, IOException {
+    return getTableImpl(tableRef, selectedFields, view);
+  }
+
+  public Table getTableImpl(
+      TableReference tableRef,
+      @Nullable List<String> selectedFields,
+      @Nullable TableMetadataView view)
+      throws InterruptedException, IOException {
     synchronized (FakeDatasetService.class) {
       Map<String, TableContainer> dataset =
           tables.get(tableRef.getProjectId(), tableRef.getDatasetId());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 6007d6a11bd..a57299b0a0d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -474,7 +474,7 @@ public class BigQueryServicesImplTest {
 
     Table table =
         datasetService.getTable(
-            tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+            tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
 
     assertEquals(testTable, table);
     verifyAllResponsesAreRead();
@@ -499,7 +499,7 @@ public class BigQueryServicesImplTest {
             .setTableId("tableId");
     Table table =
         datasetService.getTable(
-            tableRef, Collections.emptyList(), BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+            tableRef, Collections.emptyList(), null, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
 
     assertNull(table);
     verifyAllResponsesAreRead();
@@ -526,7 +526,7 @@ public class BigQueryServicesImplTest {
         new BigQueryServicesImpl.DatasetServiceImpl(
             bigquery, null, PipelineOptionsFactory.create());
     datasetService.getTable(
-        tableRef, Collections.emptyList(), BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+        tableRef, Collections.emptyList(), null, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
   }
 
   @Test
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 3895c8c315c..cef93913401 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -125,6 +125,7 @@ public class BigQueryUtilTest {
     when(mockClient.tables()).thenReturn(mockTables);
     when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
     when(mockTablesGet.setPrettyPrint(false)).thenReturn(mockTablesGet);
+    when(mockTablesGet.set(anyString(), anyString())).thenReturn(mockTablesGet);
     when(mockTablesGet.execute()).thenReturn(table);
   }
 
@@ -132,6 +133,7 @@ public class BigQueryUtilTest {
     verify(mockClient).tables();
     verify(mockTables).get("project", "dataset", "table");
     verify(mockTablesGet, atLeastOnce()).setPrettyPrint(false);
+    verify(mockTablesGet, atLeastOnce()).set(anyString(), anyString());
     verify(mockTablesGet, atLeastOnce()).execute();
   }