You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/27 20:14:27 UTC
[2/2] incubator-beam git commit: BigQueryServicesImpl: fix issues in
insertAll and add better tests
BigQueryServicesImpl: fix issues in insertAll and add better tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4b98fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4b98fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4b98fd3
Branch: refs/heads/master
Commit: e4b98fd39a896a6d3d386d64612f75adab76af8e
Parents: 1556eb7
Author: Dan Halperin <dh...@google.com>
Authored: Tue Sep 27 10:47:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Sep 27 13:14:17 2016 -0700
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryServicesImpl.java | 23 +++--
.../gcp/bigquery/BigQueryServicesImplTest.java | 93 +++++++++++++++++++-
2 files changed, 105 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4b98fd3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 7d98401..3862382 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -543,10 +543,9 @@ class BigQueryServicesImpl implements BigQueryServices {
backoff);
}
- @Override
- public long insertAll(
- TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
- throws IOException, InterruptedException {
+ @VisibleForTesting
+ long insertAll(TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList,
+ BackOff backoff, final Sleeper sleeper) throws IOException, InterruptedException {
checkNotNull(ref, "ref");
if (executor == null) {
this.executor = options.as(GcsOptions.class).getExecutorService();
@@ -556,8 +555,6 @@ class BigQueryServicesImpl implements BigQueryServices {
+ "as many elements as rowList");
}
- BackOff backoff = INSERT_BACKOFF_FACTORY.backoff();
-
long retTotalDataSize = 0;
List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
// These lists contain the rows to publish. Initially the contain the entire list.
@@ -607,7 +604,7 @@ class BigQueryServicesImpl implements BigQueryServices {
if (new ApiErrorExtractor().rateLimited(e)) {
LOG.info("BigQuery insertAll exceeded rate limit, retrying");
try {
- Thread.sleep(backoff.nextBackOffMillis());
+ sleeper.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException interrupted) {
throw new IOException(
"Interrupted while waiting before retrying insertAll");
@@ -662,16 +659,16 @@ class BigQueryServicesImpl implements BigQueryServices {
break;
}
try {
- Thread.sleep(backoff.nextBackOffMillis());
+ sleeper.sleep(nextBackoffMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(
"Interrupted while waiting before retrying insert of " + retryRows);
}
- LOG.info("Retrying failed inserts to BigQuery");
rowsToPublish = retryRows;
idsToPublish = retryIds;
allErrors.clear();
+ LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size());
}
if (!allErrors.isEmpty()) {
throw new IOException("Insert failed: " + allErrors);
@@ -679,6 +676,14 @@ class BigQueryServicesImpl implements BigQueryServices {
return retTotalDataSize;
}
}
+
+ @Override
+ public long insertAll(
+ TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
+ throws IOException, InterruptedException {
+ return insertAll(
+ ref, rowList, insertIdList, INSERT_BACKOFF_FACTORY.backoff(), Sleeper.DEFAULT);
+ }
}
private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4b98fd3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index fb472fc..0e76660 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -33,6 +36,7 @@ import com.google.api.client.json.Json;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.testing.util.MockSleeper;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
@@ -42,6 +46,7 @@ import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
@@ -67,6 +72,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
/**
* Tests for {@link BigQueryServicesImpl}.
@@ -345,7 +352,7 @@ public class BigQueryServicesImplTest {
DatasetServiceImpl dataService =
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
- dataService.insertAll(ref, rows, null);
+ dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
@@ -353,6 +360,88 @@ public class BigQueryServicesImplTest {
}
/**
+ * Tests that {@link DatasetServiceImpl#insertAll} retries selected rows on failure.
+ */
+ @Test
+ public void testInsertRetrySelectRows() throws Exception {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<TableRow> rows = ImmutableList.of(
+ new TableRow().set("row", "a"), new TableRow().set("row", "b"));
+ List<String> insertIds = ImmutableList.of("a", "b");
+
+ final TableDataInsertAllResponse bFailed = new TableDataInsertAllResponse()
+ .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
+
+ final TableDataInsertAllResponse allRowsSucceeded = new TableDataInsertAllResponse();
+
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(200).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(bFailed)).thenReturn(toStream(allRowsSucceeded));
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+ dataService.insertAll(ref, rows, insertIds, TEST_BACKOFF.backoff(), new MockSleeper());
+ verify(response, times(2)).getStatusCode();
+ verify(response, times(2)).getContent();
+ verify(response, times(2)).getContentType();
+ expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+ }
+
+ // A BackOff that makes a total of 4 attempts
+ private static final FluentBackoff TEST_BACKOFF = FluentBackoff.DEFAULT.withMaxRetries(3);
+
+ /**
+ * Tests that {@link DatasetServiceImpl#insertAll} fails gracefully when persistent issues.
+ */
+ @Test
+ public void testInsertFailsGracefully() throws Exception {
+ TableReference ref =
+ new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+ List<TableRow> rows = ImmutableList.of(new TableRow(), new TableRow());
+
+ final TableDataInsertAllResponse row1Failed = new TableDataInsertAllResponse()
+ .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(1L)));
+
+ final TableDataInsertAllResponse row0Failed = new TableDataInsertAllResponse()
+ .setInsertErrors(ImmutableList.of(new InsertErrors().setIndex(0L)));
+
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ // Always return 200.
+ when(response.getStatusCode()).thenReturn(200);
+ // Return row 1 failing, then we retry row 1 as row 0, and row 0 persistently fails.
+ when(response.getContent())
+ .thenReturn(toStream(row1Failed))
+ .thenAnswer(new Answer<InputStream>() {
+ @Override
+ public InputStream answer(InvocationOnMock invocation) throws Throwable {
+ return toStream(row0Failed);
+ }
+ });
+
+
+ DatasetServiceImpl dataService =
+ new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ // Expect it to fail.
+ try {
+ dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
+ fail();
+ } catch (IOException e) {
+ assertThat(e, instanceOf(IOException.class));
+ assertThat(e.getMessage(), containsString("Insert failed:"));
+ assertThat(e.getMessage(), containsString("[{\"index\":0}]"));
+ }
+
+ // Verify the exact number of retries as well as log messages.
+ verify(response, times(4)).getStatusCode();
+ verify(response, times(4)).getContent();
+ verify(response, times(4)).getContentType();
+ expectedLogs.verifyInfo("Retrying 1 failed inserts to BigQuery");
+ }
+
+ /**
* Tests that {@link DatasetServiceImpl#insertAll} does not retry non-rate-limited attempts.
*/
@Test
@@ -377,7 +466,7 @@ public class BigQueryServicesImplTest {
new DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
try {
- dataService.insertAll(ref, rows, null);
+ dataService.insertAll(ref, rows, null, TEST_BACKOFF.backoff(), new MockSleeper());
fail();
} catch (RuntimeException e) {
verify(response, times(1)).getStatusCode();