You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/11 02:14:21 UTC

[1/2] beam git commit: [BEAM-1258] DONT_RETRY_NOT_FOUND in BigQueryServicesImpl.isTableEmpty().

Repository: beam
Updated Branches:
  refs/heads/master 68b4c34a4 -> 055f452c0


[BEAM-1258] DONT_RETRY_NOT_FOUND in BigQueryServicesImpl.isTableEmpty().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3490a36e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3490a36e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3490a36e

Branch: refs/heads/master
Commit: 3490a36e5877227943c5635b4d92706f1d128d22
Parents: 68b4c34
Author: Pei He <pe...@google.com>
Authored: Tue Jan 10 10:57:38 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 10 18:13:52 2017 -0800

----------------------------------------------------------------------
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 18 +++--
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 74 ++++++++++++++++++++
 2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3490a36e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c4c7344..2098148 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -527,14 +527,24 @@ class BigQueryServicesImpl implements BigQueryServices {
       BackOff backoff =
           FluentBackoff.DEFAULT
               .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
+      return isTableEmpty(
+          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
+          backoff,
+          Sleeper.DEFAULT);
+    }
+
+    @VisibleForTesting
+    boolean isTableEmpty(TableReference tableRef, BackOff backoff, Sleeper sleeper)
+        throws IOException, InterruptedException {
       TableDataList dataList = executeWithRetries(
-          client.tabledata().list(projectId, datasetId, tableId),
+          client.tabledata().list(
+              tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
           String.format(
               "Unable to list table data: %s, aborting after %d retries.",
-              tableId, MAX_RPC_RETRIES),
-          Sleeper.DEFAULT,
+              tableRef.getTableId(), MAX_RPC_RETRIES),
+          sleeper,
           backoff,
-          ALWAYS_RETRY);
+          DONT_RETRY_NOT_FOUND);
       return dataList.getRows() == null || dataList.getRows().isEmpty();
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3490a36e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index bfd1319..1ce10f1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Verify.verifyNotNull;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -49,6 +50,7 @@ import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
 import com.google.api.services.bigquery.model.TableDataInsertAllResponse.InsertErrors;
+import com.google.api.services.bigquery.model.TableDataList;
 import com.google.api.services.bigquery.model.TableFieldSchema;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -385,6 +387,78 @@ public class BigQueryServicesImplTest {
   }
 
   @Test
+  public void testIsTableEmptySucceeds() throws Exception {
+    TableReference tableRef = new TableReference()
+        .setProjectId("projectId")
+        .setDatasetId("datasetId")
+        .setTableId("tableId");
+
+    TableDataList testDataList = new TableDataList()
+        .setRows(ImmutableList.of(new TableRow()));
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(testDataList));
+
+    BigQueryServicesImpl.DatasetServiceImpl datasetService =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    assertFalse(
+        datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT));
+
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+  }
+
+  @Test
+  public void testIsTableEmptyNoRetryForNotFound() throws IOException, InterruptedException {
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(404);
+
+    BigQueryServicesImpl.DatasetServiceImpl datasetService =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    TableReference tableRef = new TableReference()
+        .setProjectId("projectId")
+        .setDatasetId("datasetId")
+        .setTableId("tableId");
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));
+
+    try {
+      datasetService.isTableEmpty(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+    } finally {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+    }
+  }
+
+  @Test
+  public void testIsTableEmptyThrows() throws Exception {
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(401);
+
+    TableReference tableRef = new TableReference()
+        .setProjectId("projectId")
+        .setDatasetId("datasetId")
+        .setTableId("tableId");
+
+    BigQueryServicesImpl.DatasetServiceImpl datasetService =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(String.format("Unable to list table data: %s", tableRef.getTableId()));
+
+    datasetService.isTableEmpty(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+  }
+
+  @Test
   public void testExecuteWithRetries() throws IOException, InterruptedException {
     Table testTable = new Table();
 


[2/2] beam git commit: This closes #1759

Posted by dh...@apache.org.
This closes #1759


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/055f452c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/055f452c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/055f452c

Branch: refs/heads/master
Commit: 055f452c0a2f62920f58928e66aebe21744eaf41
Parents: 68b4c34 3490a36
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jan 10 18:14:09 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 10 18:14:09 2017 -0800

----------------------------------------------------------------------
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 18 +++--
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 74 ++++++++++++++++++++
 2 files changed, 88 insertions(+), 4 deletions(-)
----------------------------------------------------------------------