You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/19 23:16:00 UTC

[jira] [Commented] (BEAM-1306) Support additional configuration in BigQueryServices.insertAll()

    [ https://issues.apache.org/jira/browse/BEAM-1306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297575#comment-16297575 ] 

ASF GitHub Bot commented on BEAM-1306:
--------------------------------------

jkff closed pull request #2282: [BEAM-1306] Refactors BigQueryServices interface to allow the client to set options
URL: https://github.com/apache/beam/pull/2282
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 03e18e6561e..18e44e45d7e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -2559,8 +2559,9 @@ private void flushRows(TableReference tableReference,
             throws InterruptedException {
       if (!tableRows.isEmpty()) {
         try {
-          long totalBytes = bqServices.getDatasetService(options).insertAll(
-              tableReference, tableRows, uniqueIds);
+          DatasetService datasetService = bqServices.getDatasetService(options);
+          long totalBytes = datasetService.insertAll(tableReference,
+              datasetService.makeInsertBatches(tableRows, uniqueIds));
           byteCounter.inc(totalBytes);
         } catch (IOException e) {
           throw new RuntimeException(e);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index ebff6c1aecf..04675f2f68e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -26,10 +26,12 @@
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collection;
 import java.util.List;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
@@ -160,13 +162,24 @@ void deleteDataset(String projectId, String datasetId)
         throws IOException, InterruptedException;
 
     /**
-     * Inserts {@link TableRow TableRows} with the specified insertIds if not null.
+     * Creates a {@link TableDataInsertAllRequest} containing the
+     * {@link TableRow TableRows} with the specified insertIds if not null.
      *
-     * <p>Returns the total bytes count of {@link TableRow TableRows}.
+     * @return the total bytes count of {@link TableRow TableRows}.
      */
-    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+    long insertAll(TableReference ref, Collection<TableDataInsertAllRequest> batches)
         throws IOException, InterruptedException;
 
+    /**
+     * Converts a list of {@link TableRow} to a list of {@link TableDataInsertAllRequest}.
+     *
+     * @param rowList List of {@link TableRow} to make into batches
+     * @param insertIdList Optional list of insert IDs for each row
+     * @return a list of {@link TableDataInsertAllRequest} batches
+     */
+    List<TableDataInsertAllRequest> makeInsertBatches(List<TableRow> rowList,
+        @Nullable List<String> insertIdList);
+
     /** Patch BigQuery {@link Table} description. */
     Table patchTableDescription(TableReference tableReference, @Nullable String tableDescription)
         throws IOException, InterruptedException;
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 15ca2620629..0a27fd7a9d3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
@@ -45,8 +46,10 @@
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
+
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -56,6 +59,7 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -648,13 +652,20 @@ public void deleteDataset(String projectId, String datasetId)
           ALWAYS_RETRY);
     }
 
-    @VisibleForTesting
-    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList,
+    private long executeBatches(TableReference ref, TableDataInsertAllRequest request,
         BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException {
       checkNotNull(ref, "ref");
       if (executor == null) {
         this.executor = options.as(GcsOptions.class).getExecutorService();
       }
+
+      List<String> insertIdList = new ArrayList<>();
+      List<TableDataInsertAllRequest.Rows> rowList = request.getRows();
+
+      for (TableDataInsertAllRequest.Rows row : rowList) {
+        insertIdList.add(row.getInsertId());
+      }
+
       if (insertIdList != null && rowList.size() != insertIdList.size()) {
         throw new AssertionError("If insertIdList is not null it needs to have at least "
             + "as many elements as rowList");
@@ -664,10 +675,10 @@ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
       // These lists contain the rows to publish. Initially the contain the entire list.
       // If there are failures, they will contain only the failed rows to be retried.
-      List<TableRow> rowsToPublish = rowList;
+      List<TableDataInsertAllRequest.Rows> rowsToPublish = rowList;
       List<String> idsToPublish = insertIdList;
       while (true) {
-        List<TableRow> retryRows = new ArrayList<>();
+        List<TableDataInsertAllRequest.Rows> retryRows = new ArrayList<>();
         List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
 
         int strideIndex = 0;
@@ -679,15 +690,10 @@ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String
         List<Integer> strideIndices = new ArrayList<>();
 
         for (int i = 0; i < rowsToPublish.size(); ++i) {
-          TableRow row = rowsToPublish.get(i);
-          TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
-          if (idsToPublish != null) {
-            out.setInsertId(idsToPublish.get(i));
-          }
-          out.setJson(row.getUnknownKeys());
-          rows.add(out);
+          TableDataInsertAllRequest.Rows row = rowsToPublish.get(i);
+          rows.add(row);
 
-          dataSize += row.toString().length();
+          dataSize += row.getJson().toString().length();
           if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
               || i == rowsToPublish.size() - 1) {
             TableDataInsertAllRequest content = new TableDataInsertAllRequest();
@@ -782,12 +788,52 @@ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String
       }
     }
 
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>This implementation returns a single batch of {@link TableDataInsertAllRequest}
+     * with the actual batching implementation handled in executeBatches.
+     */
     @Override
-    public long insertAll(
-        TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+    public List<TableDataInsertAllRequest> makeInsertBatches(List<TableRow> rowList,
+        @Nullable List<String> insertIdList) {
+
+      if (insertIdList != null && rowList.size() != insertIdList.size()) {
+        throw new AssertionError("If insertIdList is not null it needs to have at least "
+            + "as many elements as rowList");
+      }
+
+      TableDataInsertAllRequest request = new TableDataInsertAllRequest();
+      request.setRows(new ArrayList<TableDataInsertAllRequest.Rows>());
+      List<TableDataInsertAllRequest> batches = new ArrayList<>();
+      batches.add(request);
+
+      for (int i = 0; i < rowList.size(); ++i) {
+        TableDataInsertAllRequest.Rows row = new TableDataInsertAllRequest.Rows();
+        row.setJson(rowList.get(i));
+        if (insertIdList != null) {
+          row.setInsertId(insertIdList.get(i));
+        }
+        request.getRows().add(row);
+      }
+
+      return batches;
+    }
+
+    @Override
+    public long insertAll(TableReference ref, Collection<TableDataInsertAllRequest> batches)
         throws IOException, InterruptedException {
-      return insertAll(
-          ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+      return insertAll(ref, batches, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+    }
+
+    @VisibleForTesting
+    long insertAll(TableReference ref, Collection<TableDataInsertAllRequest> batches,
+        BackOff backOff, Sleeper sleeper) throws IOException, InterruptedException {
+      checkArgument(batches.size() == 1,
+          "parameter batches was expected to have size equal to 1");
+      TableDataInsertAllRequest request = new ArrayList<>(batches).get(0);
+      return executeBatches(ref, request, backOff, sleeper);
     }
 
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index dcc38007377..b81ea558fd3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -36,7 +36,6 @@
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.when;
-
 import com.google.api.client.json.GenericJson;
 import com.google.api.client.util.Data;
 import com.google.api.services.bigquery.model.Dataset;
@@ -52,6 +51,7 @@
 import com.google.api.services.bigquery.model.JobStatistics4;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -630,22 +630,53 @@ public void deleteDataset(String projectId, String datasetId)
 
     @Override
     public long insertAll(
-        TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+        TableReference ref, Collection<TableDataInsertAllRequest> batches)
         throws IOException, InterruptedException {
       synchronized (tables) {
-        assertEquals(rowList.size(), insertIdList.size());
 
         long dataSize = 0;
-        TableContainer tableContainer = getTableContainer(
-            ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
-        for (int i = 0; i < rowList.size(); ++i) {
-          tableContainer.addRow(rowList.get(i), insertIdList.get(i));
-          dataSize += rowList.get(i).toString().length();
+        for (TableDataInsertAllRequest batch : batches) {
+          List<TableDataInsertAllRequest.Rows> rows = batch.getRows();
+          TableContainer tableContainer = getTableContainer(
+              ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+          for (int i = 0; i < rows.size(); ++i) {
+            TableRow tableRow = new TableRow();
+            tableRow.setUnknownKeys(rows.get(i).getJson());
+            tableContainer.addRow(tableRow, rows.get(i).getInsertId());
+            dataSize += tableRow.toString().length();
+          }
         }
         return dataSize;
       }
     }
 
+    @Override
+    public List<TableDataInsertAllRequest> makeInsertBatches(List<TableRow> rowList,
+        @Nullable List<String> insertIdList) {
+
+      if (insertIdList != null && rowList.size() != insertIdList.size()) {
+        throw new AssertionError("If insertIdList is not null it needs to have at least "
+            + "as many elements as rowList");
+      }
+
+      // This implementation puts everything in one batch
+      TableDataInsertAllRequest request = new TableDataInsertAllRequest();
+      request.setRows(new ArrayList<TableDataInsertAllRequest.Rows>());
+      ArrayList<TableDataInsertAllRequest> batches = new ArrayList<>();
+      batches.add(request);
+
+      for (int i = 0; i < rowList.size(); ++i) {
+        TableDataInsertAllRequest.Rows row = new TableDataInsertAllRequest.Rows();
+        row.setJson(rowList.get(i).getUnknownKeys());
+        if (insertIdList != null) {
+          row.setInsertId(insertIdList.get(i));
+        }
+        request.getRows().add(row);
+      }
+
+      return batches;
+    }
+
     @Override
     public Table patchTableDescription(TableReference tableReference,
                                        @Nullable String tableDescription)
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index ef51650633c..1dd626daffc 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -500,7 +500,8 @@ public void testInsertRetry() throws Exception {
 
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-    dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+    dataService.insertAll(ref, dataService.makeInsertBatches(rows, null),
+        TEST_BACKOFF.backoff(), new MockSleeper());
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -536,7 +537,9 @@ public void testInsertRetrySelectRows() throws Exception {
 
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-    dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper());
+
+    dataService.insertAll(ref, dataService.makeInsertBatches(rows, insertIds),
+        TEST_BACKOFF.backoff(), new MockSleeper());
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -577,7 +580,8 @@ public InputStream answer(InvocationOnMock invocation) throws Throwable {
 
     // Expect it to fail.
     try {
-      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+      dataService.insertAll(ref, dataService.makeInsertBatches(rows, null),
+          TEST_BACKOFF.backoff(), new MockSleeper());
       fail();
     } catch (IOException e) {
       assertThat(e, instanceOf(IOException.class));
@@ -617,7 +621,8 @@ public void testInsertDoesNotRetry() throws Throwable {
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
 
     try {
-      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+      dataService.insertAll(ref, dataService.makeInsertBatches(rows, null),
+          TEST_BACKOFF.backoff(), new MockSleeper());
       fail();
     } catch (RuntimeException e) {
       verify(response, times(1)).getStatusCode();
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 7b5b226d192..5ea877ea9f3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -400,7 +400,7 @@ public void testInsertAll() throws Exception, IOException {
 
     long totalBytes = 0;
     try {
-      totalBytes = datasetService.insertAll(ref, rows, ids);
+      totalBytes = datasetService.insertAll(ref, datasetService.makeInsertBatches(rows, ids));
     } finally {
       verifyInsertAll(5);
       // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support additional configuration in BigQueryServices.insertAll()
> ----------------------------------------------------------------
>
>                 Key: BEAM-1306
>                 URL: https://issues.apache.org/jira/browse/BEAM-1306
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-extensions
>            Reporter: Pei He
>            Assignee: Pei He
>
> ignoreUnknownValues is requested in https://issues.apache.org/jira/browse/BEAM-1267
> There are additional configurations that could be useful.
> TableDataInsertAllRequest content = new TableDataInsertAllRequest();
> content.setSkipInvalidRows();
> content.setTemplateSuffix();
> content.setKind();
>             
> I think we can improve the BigQueryServices interface by define it as:
> void insertAll(TableReference ref, Collection<TableDataInsertAllRequest> request);
> and, provided a static method to prepare requests:
> List<TableDataInsertAllRequest> makeInsertBatches(List<TableRow> rowList, @Nullable List<String> insertIdList);
> Then, client can set additional config in the returned list.
> Reference:
> GcsUtil.java has similar implementation in makeCopyBatches() and executeBatches().
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java#L550



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)