You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/10/01 20:00:16 UTC

[beam] branch master updated: [BEAM-10930] Use dense JSON responses for BigQueryIO interactions (#12874)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b6cef8  [BEAM-10930] Use dense JSON responses for BigQueryIO interactions (#12874)
9b6cef8 is described below

commit 9b6cef8de30497b9014473327d8300dedd655f2d
Author: shollyman <sh...@google.com>
AuthorDate: Thu Oct 1 12:59:32 2020 -0700

    [BEAM-10930] Use dense JSON responses for BigQueryIO interactions (#12874)
    
    * [BEAM-10930] Use dense JSON responses for BigQueryIO interactions
    
    This updates requests made by the BigQuery IO connector to include the underlying
    GET param to use dense JSON encoding.
    
    Without it, BigQueryIO receives pretty-printed JSON responses from the
    BigQuery service, which represent a significant inflation in
    unneccessary whitespace bytes.
    
    * elide extra period
    
    * ran ./gradlew :sdks:java:io:google-cloud-platform:spotlessApply
    
    * update mocks
---
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 25 +++++++++++++++-------
 .../beam/sdk/io/gcp/bigquery/TestBigQuery.java     |  3 +++
 .../beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java | 13 ++++++++---
 3 files changed, 30 insertions(+), 11 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 278957e..5935a76 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -271,7 +271,7 @@ class BigQueryServicesImpl implements BigQueryServices {
       Exception lastException;
       do {
         try {
-          client.jobs().insert(jobRef.getProjectId(), job).execute();
+          client.jobs().insert(jobRef.getProjectId(), job).setPrettyPrint(false).execute();
           LOG.info(
               "Started BigQuery job: {}.\n{}",
               jobRef,
@@ -314,6 +314,7 @@ class BigQueryServicesImpl implements BigQueryServices {
                   .jobs()
                   .get(jobRef.getProjectId(), jobRef.getJobId())
                   .setLocation(jobRef.getLocation())
+                  .setPrettyPrint(false)
                   .execute();
           if (job == null) {
             LOG.info("Still waiting for BigQuery job {} to start", jobRef);
@@ -357,7 +358,7 @@ class BigQueryServicesImpl implements BigQueryServices {
               .setJobReference(jobRef)
               .setConfiguration(new JobConfiguration().setQuery(queryConfig).setDryRun(true));
       return executeWithRetries(
-              client.jobs().insert(projectId, job),
+              client.jobs().insert(projectId, job).setPrettyPrint(false),
               String.format(
                   "Unable to dry run query: %s, aborting after %d retries.",
                   queryConfig, MAX_RPC_RETRIES),
@@ -390,6 +391,7 @@ class BigQueryServicesImpl implements BigQueryServices {
               .jobs()
               .get(jobRef.getProjectId(), jobId)
               .setLocation(jobRef.getLocation())
+              .setPrettyPrint(false)
               .execute();
         } catch (GoogleJsonResponseException e) {
           if (errorExtractor.itemNotFound(e)) {
@@ -494,7 +496,10 @@ class BigQueryServicesImpl implements BigQueryServices {
         TableReference ref, @Nullable List<String> selectedFields, BackOff backoff, Sleeper sleeper)
         throws IOException, InterruptedException {
       Tables.Get get =
-          client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+          client
+              .tables()
+              .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId())
+              .setPrettyPrint(false);
       if (selectedFields != null && !selectedFields.isEmpty()) {
         get.setSelectedFields(String.join(",", selectedFields));
       }
@@ -557,6 +562,7 @@ class BigQueryServicesImpl implements BigQueryServices {
                   table.getTableReference().getProjectId(),
                   table.getTableReference().getDatasetId(),
                   table)
+              .setPrettyPrint(false)
               .execute();
         } catch (IOException e) {
           ApiErrorExtractor extractor = new ApiErrorExtractor();
@@ -622,7 +628,8 @@ class BigQueryServicesImpl implements BigQueryServices {
           executeWithRetries(
               client
                   .tabledata()
-                  .list(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
+                  .list(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId())
+                  .setPrettyPrint(false),
               String.format(
                   "Unable to list table data: %s, aborting after %d retries.",
                   tableRef.getTableId(), MAX_RPC_RETRIES),
@@ -643,7 +650,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     public Dataset getDataset(String projectId, String datasetId)
         throws IOException, InterruptedException {
       return executeWithRetries(
-          client.datasets().get(projectId, datasetId),
+          client.datasets().get(projectId, datasetId).setPrettyPrint(false),
           String.format(
               "Unable to get dataset: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
@@ -703,7 +710,7 @@ class BigQueryServicesImpl implements BigQueryServices {
       Exception lastException;
       do {
         try {
-          client.datasets().insert(projectId, dataset).execute();
+          client.datasets().insert(projectId, dataset).setPrettyPrint(false).execute();
           return; // SUCCEEDED
         } catch (GoogleJsonResponseException e) {
           if (errorExtractor.itemAlreadyExists(e)) {
@@ -819,7 +826,8 @@ class BigQueryServicesImpl implements BigQueryServices {
             final Bigquery.Tabledata.InsertAll insert =
                 client
                     .tabledata()
-                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content);
+                    .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content)
+                    .setPrettyPrint(false);
 
             futures.add(
                 executor.submit(
@@ -983,7 +991,8 @@ class BigQueryServicesImpl implements BigQueryServices {
                   tableReference.getProjectId(),
                   tableReference.getDatasetId(),
                   tableReference.getTableId(),
-                  table),
+                  table)
+              .setPrettyPrint(false),
           String.format(
               "Unable to patch table description: %s, aborting after %d retries.",
               tableReference, MAX_RPC_RETRIES),
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
index 37ea573..a5a5218 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TestBigQuery.java
@@ -214,6 +214,7 @@ public class TestBigQuery implements TestRule {
             pipelineOptions.getTargetDataset(),
             table.getTableReference().getTableId(),
             new TableDataInsertAllRequest().setRows(bqRows))
+        .setPrettyPrint(false)
         .execute();
   }
 
@@ -281,6 +282,7 @@ public class TestBigQuery implements TestRule {
               pipelineOptions.getProject(),
               pipelineOptions.getTargetDataset(),
               table.getTableReference().getTableId())
+          .setPrettyPrint(false)
           .execute()
           .getSchema();
     } catch (IOException e) {
@@ -295,6 +297,7 @@ public class TestBigQuery implements TestRule {
               pipelineOptions.getProject(),
               pipelineOptions.getTargetDataset(),
               table.getTableReference().getTableId())
+          .setPrettyPrint(false)
           .execute()
           .getRows();
     } catch (IOException e) {
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 98b8bc8..deb1952 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -102,9 +102,13 @@ public class BigQueryUtilTest {
       responses.add(response);
     }
 
+    Bigquery.Tabledata.InsertAll mockInsertAll = mock(Bigquery.Tabledata.InsertAll.class);
+    when(mockTabledata.insertAll(
+            anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class)))
+        .thenReturn(mockInsertAll);
+
     doAnswer(
             invocation -> {
-              Bigquery.Tabledata.InsertAll mockInsertAll = mock(Bigquery.Tabledata.InsertAll.class);
               when(mockInsertAll.execute())
                   .thenReturn(
                       responses.get(0),
@@ -113,8 +117,8 @@ public class BigQueryUtilTest {
                           .toArray(new TableDataInsertAllResponse[responses.size() - 1]));
               return mockInsertAll;
             })
-        .when(mockTabledata)
-        .insertAll(anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class));
+        .when(mockInsertAll)
+        .setPrettyPrint(false);
   }
 
   private void verifyInsertAll(int expectedRetries) throws IOException {
@@ -126,18 +130,21 @@ public class BigQueryUtilTest {
   private void onTableGet(Table table) throws IOException {
     when(mockClient.tables()).thenReturn(mockTables);
     when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
+    when(mockTablesGet.setPrettyPrint(false)).thenReturn(mockTablesGet);
     when(mockTablesGet.execute()).thenReturn(table);
   }
 
   private void verifyTableGet() throws IOException {
     verify(mockClient).tables();
     verify(mockTables).get("project", "dataset", "table");
+    verify(mockTablesGet, atLeastOnce()).setPrettyPrint(false);
     verify(mockTablesGet, atLeastOnce()).execute();
   }
 
   private void onTableList(TableDataList result) throws IOException {
     when(mockClient.tabledata()).thenReturn(mockTabledata);
     when(mockTabledata.list(anyString(), anyString(), anyString())).thenReturn(mockTabledataList);
+    when(mockTabledataList.setPrettyPrint(false)).thenReturn(mockTabledataList);
     when(mockTabledataList.execute()).thenReturn(result);
   }