You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/11 02:14:21 UTC
[1/2] beam git commit: [BEAM-1258] DONT_RETRY_NOT_FOUND in
BigQueryServicesImpl.isTableEmpty().
Repository: beam
Updated Branches:
refs/heads/master 68b4c34a4 -> 055f452c0
[BEAM-1258] DONT_RETRY_NOT_FOUND in BigQueryServicesImpl.isTableEmpty().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3490a36e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3490a36e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3490a36e
Branch: refs/heads/master
Commit: 3490a36e5877227943c5635b4d92706f1d128d22
Parents: 68b4c34
Author: Pei He <pe...@google.com>
Authored: Tue Jan 10 10:57:38 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 10 18:13:52 2017 -0800
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryServicesImpl.java | 18 +++--
.../gcp/bigquery/BigQueryServicesImplTest.java | 74 ++++++++++++++++++++
2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3490a36e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c4c7344..2098148 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -527,14 +527,24 @@ class BigQueryServicesImpl implements BigQueryServices {
BackOff backoff =
FluentBackoff.DEFAULT
.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
+ return isTableEmpty(
+ new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
+ backoff,
+ Sleeper.DEFAULT);
+ }
+
+ @VisibleForTesting
+ boolean isTableEmpty(TableReference tableRef, BackOff backoff, Sleeper sleeper)
+ throws IOException, InterruptedException {
TableDataList dataList = executeWithRetries(
- client.tabledata().list(projectId, datasetId, tableId),
+ client.tabledata().list(
+ tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
String.format(
"Unable to list table data: %s, aborting after %d retries.",
- tableId, MAX_RPC_RETRIES),
- Sleeper.DEFAULT,
+ tableRef.getTableId(), MAX_RPC_RETRIES),
+ sleeper,
backoff,
- ALWAYS_RETRY);
+ DONT_RETRY_NOT_FOUND);
return dataList.getRows() == null || dataList.getRows().isEmpty();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3490a36e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index bfd1319..1ce10f1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -49,6 +50,7 @@ import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
+import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
@@ -385,6 +387,78 @@ public class BigQueryServicesImplTest {
}
@Test
+ public void testIsTableEmptySucceeds() throws Exception {
+ TableReference tableRef = new TableReference()
+ .setProjectId("projectId")
+ .setDatasetId("datasetId")
+ .setTableId("tableId");
+
+ TableDataList testDataList = new TableDataList()
+ .setRows(ImmutableList.of(new TableRow()));
+
+ // First response is 403 rate limited, second response has valid payload.
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+ when(response.getContent())
+ .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+ .thenReturn(toStream(testDataList));
+
+ BigQueryServicesImpl.DatasetServiceImpl datasetService =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ assertFalse(
+ datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT));
+
+ verify(response, times(2)).getStatusCode();
+ verify(response, times(2)).getContent();
+ verify(response, times(2)).getContentType();
+ }
+
+ @Test
+ public void testIsTableEmptyNoRetryForNotFound() throws IOException, InterruptedException {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(404);
+
+ BigQueryServicesImpl.DatasetServiceImpl datasetService =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ TableReference tableRef = new TableReference()
+ .setProjectId("projectId")
+ .setDatasetId("datasetId")
+ .setTableId("tableId");
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));
+
+ try {
+ datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+ } finally {
+ verify(response, times(1)).getStatusCode();
+ verify(response, times(1)).getContent();
+ verify(response, times(1)).getContentType();
+ }
+ }
+
+ @Test
+ public void testIsTableEmptyThrows() throws Exception {
+ when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+ when(response.getStatusCode()).thenReturn(401);
+
+ TableReference tableRef = new TableReference()
+ .setProjectId("projectId")
+ .setDatasetId("datasetId")
+ .setTableId("tableId");
+
+ BigQueryServicesImpl.DatasetServiceImpl datasetService =
+ new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));
+
+ datasetService.isTableEmpty(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+ }
+
+ @Test
public void testExecuteWithRetries() throws IOException, InterruptedException {
Table testTable = new Table();
[2/2] beam git commit: This closes #1759
Posted by dh...@apache.org.
This closes #1759
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/055f452c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/055f452c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/055f452c
Branch: refs/heads/master
Commit: 055f452c0a2f62920f58928e66aebe21744eaf41
Parents: 68b4c34 3490a36
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jan 10 18:14:09 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 10 18:14:09 2017 -0800
----------------------------------------------------------------------
.../io/gcp/bigquery/BigQueryServicesImpl.java | 18 +++--
.../gcp/bigquery/BigQueryServicesImplTest.java | 74 ++++++++++++++++++++
2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------