You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/01/25 02:01:04 UTC

[1/2] beam git commit: Refactor BigQueryServices to have TableReference in methods signatures

Repository: beam
Updated Branches:
  refs/heads/master e77de7c61 -> 7402d7600


Refactor BigQueryServices to have TableReference in methods signatures


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f9d1d682
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f9d1d682
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f9d1d682

Branch: refs/heads/master
Commit: f9d1d682340fa3083bc18723605bf3d0aa6d76cd
Parents: e77de7c
Author: Pei He <pe...@google.com>
Authored: Tue Jan 24 16:45:16 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jan 24 18:00:40 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 40 +++++--------------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  9 ++---
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 23 ++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 41 ++++++++------------
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  3 +-
 5 files changed, 40 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index fa49f55..b6f9fb0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -997,8 +997,7 @@ public class BigQueryIO {
         TableReference table = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
 
         Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
-            .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId())
-            .getNumBytes();
+            .getTable(table).getNumBytes();
         tableSizeBytes.compareAndSet(null, numBytes);
       }
       return tableSizeBytes.get();
@@ -1088,10 +1087,7 @@ public class BigQueryIO {
       DatasetService tableService = bqServices.getDatasetService(bqOptions);
       if (referencedTables != null && !referencedTables.isEmpty()) {
         TableReference queryTable = referencedTables.get(0);
-        location = tableService.getTable(
-            queryTable.getProjectId(),
-            queryTable.getDatasetId(),
-            queryTable.getTableId()).getLocation();
+        location = tableService.getTable(queryTable).getLocation();
       }
 
       // 2. Create the temporary dataset in the query location.
@@ -1120,10 +1116,7 @@ public class BigQueryIO {
           JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
 
       DatasetService tableService = bqServices.getDatasetService(bqOptions);
-      tableService.deleteTable(
-          tableToRemove.getProjectId(),
-          tableToRemove.getDatasetId(),
-          tableToRemove.getTableId());
+      tableService.deleteTable(tableToRemove);
       tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
     }
 
@@ -1227,10 +1220,8 @@ public class BigQueryIO {
       String extractJobId = getExtractJobId(jobIdToken);
       List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
 
-      TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
-          tableToExtract.getProjectId(),
-          tableToExtract.getDatasetId(),
-          tableToExtract.getTableId()).getSchema();
+      TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
+          .getTable(tableToExtract).getSchema();
 
       cleanupTempResource(bqOptions);
       return createSources(tempFiles, tableSchema);
@@ -1867,13 +1858,9 @@ public class BigQueryIO {
           DatasetService datasetService,
           TableReference tableRef) {
         try {
-          if (datasetService.getTable(
-              tableRef.getProjectId(),
-              tableRef.getDatasetId(),
-              tableRef.getTableId()) != null) {
+          if (datasetService.getTable(tableRef) != null) {
             checkState(
-                datasetService.isTableEmpty(
-                    tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
+                datasetService.isTableEmpty(tableRef),
                 "BigQuery table is not empty: %s.",
                 BigQueryIO.toTableSpec(tableRef));
           }
@@ -2535,10 +2522,7 @@ public class BigQueryIO {
         for (TableReference tableRef : tempTables) {
           try {
             LOG.debug("Deleting table {}", toJsonString(tableRef));
-            tableService.deleteTable(
-                tableRef.getProjectId(),
-                tableRef.getDatasetId(),
-                tableRef.getTableId());
+            tableService.deleteTable(tableRef);
           } catch (Exception e) {
             LOG.warn("Failed to delete the table {}", toJsonString(tableRef), e);
           }
@@ -2587,7 +2571,7 @@ public class BigQueryIO {
 
   private static void verifyTablePresence(DatasetService datasetService, TableReference table) {
     try {
-      datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId());
+      datasetService.getTable(table);
     } catch (Exception e) {
       ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
@@ -2712,11 +2696,7 @@ public class BigQueryIO {
           // every thread from attempting a create and overwhelming our BigQuery quota.
           DatasetService datasetService = bqServices.getDatasetService(options);
           if (!createdTables.contains(tableSpec)) {
-            Table table = datasetService.getTable(
-                tableReference.getProjectId(),
-                tableReference.getDatasetId(),
-                tableReference.getTableId());
-            if (table == null) {
+            if (datasetService.getTable(tableReference) == null) {
               TableSchema tableSchema = JSON_FACTORY.fromString(
                   jsonTableSchema.get(), TableSchema.class);
               datasetService.createTable(

http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 32cf46d..03e4391 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -119,8 +119,7 @@ interface BigQueryServices extends Serializable {
      * <p>Returns null if the table is not found.
      */
     @Nullable
-    Table getTable(String projectId, String datasetId, String tableId)
-        throws InterruptedException, IOException;
+    Table getTable(TableReference tableRef) throws InterruptedException, IOException;
 
     /**
      * Creates the specified table if it does not exist.
@@ -131,16 +130,14 @@ interface BigQueryServices extends Serializable {
      * Deletes the table specified by tableId from the dataset.
      * If the table contains data, all the data will be deleted.
      */
-    void deleteTable(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException;
+    void deleteTable(TableReference tableRef) throws IOException, InterruptedException;
 
     /**
      * Returns true if the table is empty.
      *
      * @throws IOException if the table is not found.
      */
-    boolean isTableEmpty(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException;
+    boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException;
 
     /**
      * Gets the specified {@link Dataset} resource by dataset ID.

http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index c524ce4..75796ab 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -394,15 +394,12 @@ class BigQueryServicesImpl implements BigQueryServices {
      */
     @Override
     @Nullable
-    public Table getTable(String projectId, String datasetId, String tableId)
+    public Table getTable(TableReference tableRef)
         throws IOException, InterruptedException {
       BackOff backoff =
           FluentBackoff.DEFAULT
               .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      return getTable(
-          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
-          backoff,
-          Sleeper.DEFAULT);
+      return getTable(tableRef, backoff, Sleeper.DEFAULT);
     }
 
     @VisibleForTesting
@@ -506,31 +503,27 @@ class BigQueryServicesImpl implements BigQueryServices {
      * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts.
      */
     @Override
-    public void deleteTable(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException {
+    public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
       BackOff backoff =
           FluentBackoff.DEFAULT
               .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
       executeWithRetries(
-          client.tables().delete(projectId, datasetId, tableId),
+          client.tables().delete(
+              tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()),
           String.format(
               "Unable to delete table: %s, aborting after %d retries.",
-              tableId, MAX_RPC_RETRIES),
+              tableRef.getTableId(), MAX_RPC_RETRIES),
           Sleeper.DEFAULT,
           backoff,
           ALWAYS_RETRY);
     }
 
     @Override
-    public boolean isTableEmpty(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException {
+    public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException {
       BackOff backoff =
           FluentBackoff.DEFAULT
               .withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF).backoff();
-      return isTableEmpty(
-          new TableReference().setProjectId(projectId).setDatasetId(datasetId).setTableId(tableId),
-          backoff,
-          Sleeper.DEFAULT);
+      return isTableEmpty(tableRef, backoff, Sleeper.DEFAULT);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index ba7f44e..0b8d60d 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doNothing;
@@ -526,18 +527,18 @@ public class BigQueryIOTest implements Serializable {
   private static class FakeDatasetService implements DatasetService, Serializable {
 
     @Override
-    public Table getTable(String projectId, String datasetId, String tableId)
+    public Table getTable(TableReference tableRef)
         throws InterruptedException, IOException {
       synchronized (tables) {
         Map<String, TableContainer> dataset =
             checkNotNull(
-                tables.get(projectId, datasetId),
+                tables.get(tableRef.getProjectId(), tableRef.getDatasetId()),
                 "Tried to get a dataset %s:%s from %s, but no such dataset was set",
-                projectId,
-                datasetId,
-                tableId,
+                tableRef.getProjectId(),
+                tableRef.getDatasetId(),
+                tableRef.getTableId(),
                 FakeDatasetService.class.getSimpleName());
-        TableContainer tableContainer = dataset.get(tableId);
+        TableContainer tableContainer = dataset.get(tableRef.getTableId());
         return tableContainer == null ? null : tableContainer.getTable();
       }
     }
@@ -569,8 +570,7 @@ public class BigQueryIOTest implements Serializable {
     }
 
     @Override
-    public void deleteTable(String projectId, String datasetId, String tableId)
-        throws IOException, InterruptedException {
+    public void deleteTable(TableReference tableRef) throws IOException, InterruptedException {
       throw new UnsupportedOperationException("Unsupported");
     }
 
@@ -595,9 +595,9 @@ public class BigQueryIOTest implements Serializable {
     }
 
     @Override
-    public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+    public boolean isTableEmpty(TableReference tableRef)
         throws IOException, InterruptedException {
-      Long numBytes = getTable(projectId, datasetId, tableId).getNumBytes();
+      Long numBytes = getTable(tableRef).getNumBytes();
       return numBytes == null || numBytes == 0L;
     }
 
@@ -1738,7 +1738,7 @@ public class BigQueryIOTest implements Serializable {
     IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
         .thenReturn("mock://tempLocation/output");
-    when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
+    when(mockDatasetService.getTable(any(TableReference.class)))
         .thenReturn(new Table().setSchema(new TableSchema()));
 
     Assert.assertThat(
@@ -1810,13 +1810,9 @@ public class BigQueryIOTest implements Serializable {
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)
                 .setReferencedTables(ImmutableList.of(queryTable))));
-    when(mockDatasetService.getTable(
-        eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId())))
+    when(mockDatasetService.getTable(eq(queryTable)))
         .thenReturn(new Table().setSchema(new TableSchema()));
-    when(mockDatasetService.getTable(
-        eq(destinationTable.getProjectId()),
-        eq(destinationTable.getDatasetId()),
-        eq(destinationTable.getTableId())))
+    when(mockDatasetService.getTable(eq(destinationTable)))
         .thenReturn(new Table().setSchema(new TableSchema()));
     IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
@@ -1898,10 +1894,7 @@ public class BigQueryIOTest implements Serializable {
         .thenReturn(new JobStatistics().setQuery(
             new JobStatistics2()
                 .setTotalBytesProcessed(100L)));
-    when(mockDatasetService.getTable(
-        eq(destinationTable.getProjectId()),
-        eq(destinationTable.getDatasetId()),
-        eq(destinationTable.getTableId())))
+    when(mockDatasetService.getTable(eq(destinationTable)))
         .thenReturn(new Table().setSchema(new TableSchema()));
     IOChannelUtils.setIOFactoryInternal("mock", mockIOChannelFactory, true /* override */);
     when(mockIOChannelFactory.resolve(anyString(), anyString()))
@@ -2263,9 +2256,9 @@ public class BigQueryIOTest implements Serializable {
         BigQueryIO.parseTableSpec(String.format("%s:%s.%s", projectId, datasetId, tables.get(2))));
 
     doThrow(new IOException("Unable to delete table"))
-        .when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(0));
-    doNothing().when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(1));
-    doNothing().when(mockDatasetService).deleteTable(projectId, datasetId, tables.get(2));
+        .when(mockDatasetService).deleteTable(tableRefs.get(0));
+    doNothing().when(mockDatasetService).deleteTable(tableRefs.get(1));
+    doNothing().when(mockDatasetService).deleteTable(tableRefs.get(2));
 
     WriteRename.removeTemporaryTables(mockDatasetService, tableRefs);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f9d1d682/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
index 8130238..7b5b226 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilTest.java
@@ -370,7 +370,8 @@ public class BigQueryUtilTest {
     BigQueryServicesImpl.DatasetServiceImpl services =
             new BigQueryServicesImpl.DatasetServiceImpl(mockClient, options);
 
-    services.getTable("project", "dataset", "table");
+    services.getTable(
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"));
 
     verifyTableGet();
   }


[2/2] beam git commit: This closes #1838

Posted by tg...@apache.org.
This closes #1838


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7402d760
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7402d760
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7402d760

Branch: refs/heads/master
Commit: 7402d760004f8e7f79ca122c5fd26ec4f35dbdbe
Parents: e77de7c f9d1d68
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jan 24 18:00:43 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jan 24 18:00:43 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 40 +++++--------------
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  9 ++---
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 23 ++++-------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 41 ++++++++------------
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  3 +-
 5 files changed, 40 insertions(+), 76 deletions(-)
----------------------------------------------------------------------