You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/27 20:14:26 UTC

[1/2] incubator-beam git commit: Closes #1018

Repository: incubator-beam
Updated Branches:
  refs/heads/master 1556eb77f -> 900980246


Closes #1018


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90098024
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90098024
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90098024

Branch: refs/heads/master
Commit: 900980246b3ee3bdda8009672d5092a9260f06ba
Parents: 1556eb7 e4b98fd
Author: Dan Halperin <dh...@google.com>
Authored: Tue Sep 27 13:14:17 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Sep 27 13:14:17 2016 -0700

----------------------------------------------------------------------
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 23 +++--
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 93 +++++++++++++++++++-
 2 files changed, 105 insertions(+), 11 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: BigQueryServicesImpl: fix issues in insertAll and add better tests

Posted by dh...@apache.org.
BigQueryServicesImpl: fix issues in insertAll and add better tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4b98fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4b98fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4b98fd3

Branch: refs/heads/master
Commit: e4b98fd39a896a6d3d386d64612f75adab76af8e
Parents: 1556eb7
Author: Dan Halperin <dh...@google.com>
Authored: Tue Sep 27 10:47:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Sep 27 13:14:17 2016 -0700

----------------------------------------------------------------------
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 23 +++--
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 93 +++++++++++++++++++-
 2 files changed, 105 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4b98fd3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 7d98401..3862382 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -543,10 +543,9 @@ class BigQueryServicesImpl implements BigQueryServices {
           backoff);
     }
 
-    @Override
-    public long insertAll(
-        TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
-        throws IOException, InterruptedException {
+    @VisibleForTesting
+    long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList,
+        BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException {
       checkNotNull(ref, "ref");
       if (executor == null) {
         this.executor = options.as(GcsOptions.class).getExecutorService();
@@ -556,8 +555,6 @@ class BigQueryServicesImpl implements BigQueryServices {
             + "as many elements as rowList");
       }
 
-      BackOff backoff = INSERT_BACKOFF_FACTORY.backoff();
-
       long retTotalDataSize = 0;
       List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
       // These lists contain the rows to publish. Initially the contain the entire list.
@@ -607,7 +604,7 @@ class BigQueryServicesImpl implements BigQueryServices {
                         if (new ApiErrorExtractor().rateLimited(e)) {
                           LOG.info("BigQuery insertAll exceeded rate limit, retrying");
                           try {
-                            Thread.sleep(backoff.nextBackOffMillis());
+                            sleeper.sleep(backoff.nextBackOffMillis());
                           } catch (InterruptedException interrupted) {
                             throw new IOException(
                                 "Interrupted while waiting before retrying insertAll");
@@ -662,16 +659,16 @@ class BigQueryServicesImpl implements BigQueryServices {
           break;
         }
         try {
-          Thread.sleep(backoff.nextBackOffMillis());
+          sleeper.sleep(nextBackoffMillis);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new IOException(
               "Interrupted while waiting before retrying insert of " + retryRows);
         }
-        LOG.info("Retrying failed inserts to BigQuery");
         rowsToPublish = retryRows;
         idsToPublish = retryIds;
         allErrors.clear();
+        LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
       }
       if (!allErrors.isEmpty()) {
         throw new IOException("Insert failed: " + allErrors);
@@ -679,6 +676,14 @@ class BigQueryServicesImpl implements BigQueryServices {
         return retTotalDataSize;
       }
     }
+
+    @Override
+    public long insertAll(
+        TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+        throws IOException, InterruptedException {
+          return insertAll(
+              ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+    }
   }
 
   private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4b98fd3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index fb472fc..0e76660 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -33,6 +36,7 @@ import com.google.api.client.json.Json;
 import com.google.api.client.json.jackson2.JacksonFactory;
 import com.google.api.client.testing.http.MockHttpTransport;
 import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.util.MockSleeper;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
@@ -42,6 +46,7 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
@@ -67,6 +72,8 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 /**
  * Tests for {@link BigQueryServicesImpl}.
@@ -345,7 +352,7 @@ public class BigQueryServicesImplTest {
 
     DatasetServiceImpl dataService =
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
-    dataService.insertAll(ref, rows, null);
+    dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
     verify(response, times(2)).getStatusCode();
     verify(response, times(2)).getContent();
     verify(response, times(2)).getContentType();
@@ -353,6 +360,88 @@ public class BigQueryServicesImplTest {
   }
 
   /**
+   * Tests that {@link DatasetServiceImpl#insertAll} retries selected rows on failure.
+   */
+  @Test
+  public void testInsertRetrySelectRows() throws Exception {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = ImmutableList.of(
+        new TableRow().set("row", "a"), new TableRow().set("row", "b"));
+    List<String> insertIds = ImmutableList.of("a", "b");
+
+    final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
+        .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
+
+    final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(bFailed)).thenReturn(toStream(allRowsSucceeded));
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper());
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+  }
+
+  // A BackOff that makes a total of 4 attempts
+  private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3);
+
+  /**
+   * Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues.
+   */
+  @Test
+  public void testInsertFailsGracefully() throws Exception {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = ImmutableList.of(new TableRow(), new TableRow());
+
+    final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse()
+        .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
+
+    final TableDataInsertAllResponse row0Failed = new TableDataInsertAllResponse()
+        .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L)));
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    // Always return 200.
+    when(response.getStatusCode()).thenReturn(200);
+    // Return row 1 failing, then we retry row 1 as row 0, and row 0 persistently fails.
+    when(response.getContent())
+        .thenReturn(toStream(row1Failed))
+        .thenAnswer(new Answer<InputStream>() {
+          @Override
+          public InputStream answer(InvocationOnMock invocation) throws Throwable {
+            return toStream(row0Failed);
+          }
+        });
+
+
+    DatasetServiceImpl dataService =
+        new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    // Expect it to fail.
+    try {
+      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+      fail();
+    } catch (IOException e) {
+      assertThat(e, instanceOf(IOException.class));
+      assertThat(e.getMessage(), containsString("Insert failed:"));
+      assertThat(e.getMessage(), containsString("[{\"index\":0}]"));
+    }
+
+    // Verify the exact number of retries as well as log messages.
+    verify(response, times(4)).getStatusCode();
+    verify(response, times(4)).getContent();
+    verify(response, times(4)).getContentType();
+    expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+  }
+
+  /**
    * Tests that {@link DatasetServiceImpl#insertAll} does not retry non-rate-limited attempts.
    */
   @Test
@@ -377,7 +466,7 @@ public class BigQueryServicesImplTest {
         new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
 
     try {
-      dataService.insertAll(ref, rows, null);
+      dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
       fail();
     } catch (RuntimeException e) {
       verify(response, times(1)).getStatusCode();