You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/10 21:51:25 UTC

[1/2] beam git commit: [BEAM-1258] BigQueryServicesImpl.getTable() returns null when tables not found.

Repository: beam
Updated Branches:
  refs/heads/master c1b7f8695 -> 68b4c34a4


[BEAM-1258] BigQueryServicesImpl.getTable() returns null when tables not found.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e670e7e0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e670e7e0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e670e7e0

Branch: refs/heads/master
Commit: e670e7e0aa19845a162f7da423b663cbd4199f4d
Parents: c1b7f86
Author: Pei He <pe...@google.com>
Authored: Tue Jan 10 11:49:37 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 10 13:51:07 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  5 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 32 +++++++---
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 65 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e670e7e0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 8ca473d..7173996 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -114,8 +114,11 @@ interface BigQueryServices extends Serializable {
    */
   interface DatasetService {
     /**
-     * Gets the specified {@link Table} resource by table ID or {@code null} if no table exists.
+     * Gets the specified {@link Table} resource by table ID.
+     *
+     * <p>Returns null if the table is not found.
      */
+    @Nullable
     Table getTable(String projectId, String datasetId, String tableId)
         throws InterruptedException, IOException;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e670e7e0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 4eb8e7b..c4c7344 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -393,19 +393,37 @@ class BigQueryServicesImpl implements BigQueryServices {
      * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
+    @Nullable
     public Table getTable(String projectId, String datasetId, String tableId)
         throws IOException, InterruptedException {
       BackOff backoff =
           FluentBackoff.DEFAULT
               .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      return executeWithRetries(
-          client.tables().get(projectId, datasetId, tableId),
-          String.format(
-              "Unable to get table: %s, aborting after %d retries.",
-              tableId, MAX_RPC_RETRIES),
-          Sleeper.DEFAULT,
+      return getTable(
+          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
           backoff,
-          DONT_RETRY_NOT_FOUND);
+          Sleeper.DEFAULT);
+    }
+
+    @VisibleForTesting
+    @Nullable
+    Table getTable(TableReference ref, BackOff backoff, Sleeper sleeper)
+        throws IOException, InterruptedException {
+      try {
+        return executeWithRetries(
+            client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()),
+            String.format(
+                "Unable to get table: %s, aborting after %d retries.",
+                ref.getTableId(), MAX_RPC_RETRIES),
+            sleeper,
+            backoff,
+            DONT_RETRY_NOT_FOUND);
+      } catch (IOException e) {
+        if (errorExtractor.itemNotFound(e)) {
+          return null;
+        }
+        throw e;
+      }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/e670e7e0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 10ed8bd..bfd1319 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -320,6 +320,71 @@ public class BigQueryServicesImplTest {
   }
 
   @Test
+  public void testGetTableSucceeds() throws Exception {
+    TableReference tableRef = new TableReference()
+        .setProjectId("projectId")
+        .setDatasetId("datasetId")
+        .setTableId("tableId");
+
+    Table testTable = new Table();
+    testTable.setTableReference(tableRef);
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(testTable));
+
+    BigQueryServicesImpl.DatasetServiceImpl datasetService =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+
+    assertEquals(testTable, table);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+  }
+
+  @Test
+  public void testGetTableNotFound() throws IOException, InterruptedException {
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(404);
+
+    BigQueryServicesImpl.DatasetServiceImpl datasetService =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+
+    TableReference tableRef = new TableReference()
+        .setProjectId("projectId")
+        .setDatasetId("datasetId")
+        .setTableId("tableId");
+    Table table = datasetService.getTable(tableRef, BackOff.ZERO_BACKOFF, Sleeper.DEFAULT);
+
+    assertNull(table);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  @Test
+  public void testGetTableThrows() throws Exception {
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(401);
+
+    TableReference tableRef = new TableReference()
+        .setProjectId("projectId")
+        .setDatasetId("datasetId")
+        .setTableId("tableId");
+
+    thrown.expect(IOException.class);
+    thrown.expectMessage(String.format("Unable to get table: %s", tableRef.getTableId()));
+
+    BigQueryServicesImpl.DatasetServiceImpl datasetService =
+        new BigQueryServicesImpl.DatasetServiceImpl(bigquery, PipelineOptionsFactory.create());
+    datasetService.getTable(tableRef, BackOff.STOP_BACKOFF, Sleeper.DEFAULT);
+  }
+
+  @Test
   public void testExecuteWithRetries() throws IOException, InterruptedException {
     Table testTable = new Table();
 


[2/2] beam git commit: This closes #1760

Posted by dh...@apache.org.
This closes #1760


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68b4c34a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68b4c34a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68b4c34a

Branch: refs/heads/master
Commit: 68b4c34a413fe94080aa7fa3fd8c5934c5752d19
Parents: c1b7f86 e670e7e
Author: Dan Halperin <dh...@google.com>
Authored: Tue Jan 10 13:51:14 2017 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 10 13:51:14 2017 -0800

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  5 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 32 +++++++---
 .../gcp/bigquery/BigQueryServicesImplTest.java  | 65 ++++++++++++++++++++
 3 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------