You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/01 04:34:32 UTC

[1/2] incubator-beam git commit: Replacing BigQuery direct calls with BigQueryServices abstraction

Repository: incubator-beam
Updated Branches:
  refs/heads/master e7418e2e8 -> df1d5f61e


Replacing BigQuery direct calls with BigQueryServices abstraction


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336b5160
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336b5160
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336b5160

Branch: refs/heads/master
Commit: 336b5160f13dfac29e625b7c9f42b92a65041461
Parents: e7418e2
Author: Pei He <pe...@google.com>
Authored: Tue Jun 28 17:49:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 30 21:34:26 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 118 ++++++++-----------
 .../apache/beam/sdk/util/BigQueryServices.java  |  12 ++
 .../beam/sdk/util/BigQueryServicesImpl.java     |  37 ++++++
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  39 ++++--
 4 files changed, 128 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 790e3ff..7955022 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.util.BigQueryServices.JobService;
 import org.apache.beam.sdk.util.BigQueryServicesImpl;
 import org.apache.beam.sdk.util.BigQueryTableInserter;
-import org.apache.beam.sdk.util.BigQueryTableRowIterator;
 import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
@@ -85,7 +84,6 @@ import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.QueryRequest;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
@@ -386,7 +384,7 @@ public class BigQueryIO {
        */
       final boolean validate;
       @Nullable final Boolean flattenResults;
-      @Nullable final BigQueryServices testBigQueryServices;
+      @Nullable BigQueryServices bigQueryServices;
 
       private static final String QUERY_VALIDATION_FAILURE_ERROR =
           "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
@@ -403,18 +401,18 @@ public class BigQueryIO {
             null /* jsonTableRef */,
             true /* validate */,
             null /* flattenResults */,
-            null /* testBigQueryServices */);
+            null /* bigQueryServices */);
       }
 
       private Bound(
           String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate,
-          @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) {
+          @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) {
         super(name);
         this.jsonTableRef = jsonTableRef;
         this.query = query;
         this.validate = validate;
         this.flattenResults = flattenResults;
-        this.testBigQueryServices = testBigQueryServices;
+        this.bigQueryServices = bigQueryServices;
       }
 
       /**
@@ -434,7 +432,7 @@ public class BigQueryIO {
        */
       public Bound from(TableReference table) {
         return new Bound(
-            name, query, toJsonString(table), validate, flattenResults, testBigQueryServices);
+            name, query, toJsonString(table), validate, flattenResults, bigQueryServices);
       }
 
       /**
@@ -449,7 +447,7 @@ public class BigQueryIO {
        */
       public Bound fromQuery(String query) {
         return new Bound(name, query, jsonTableRef, validate,
-            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices);
+            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), bigQueryServices);
       }
 
       /**
@@ -458,7 +456,7 @@ public class BigQueryIO {
        * occurs.
        */
       public Bound withoutValidation() {
-        return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices);
+        return new Bound(name, query, jsonTableRef, false, flattenResults, bigQueryServices);
       }
 
       /**
@@ -469,7 +467,7 @@ public class BigQueryIO {
        * from a table will cause an error during validation.
        */
       public Bound withoutResultFlattening() {
-        return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices);
+        return new Bound(name, query, jsonTableRef, validate, false, bigQueryServices);
       }
 
       @VisibleForTesting
@@ -499,36 +497,28 @@ public class BigQueryIO {
         }
 
         if (validate) {
+          BigQueryServices bqServices = getBigQueryServices();
           // Check for source table/query presence for early failure notification.
           // Note that a presence check can fail if the table or dataset are created by earlier
           // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these
           // cases the withoutValidation method can be used to disable the check.
           if (table != null) {
-            verifyDatasetPresence(bqOptions, table);
-            verifyTablePresence(bqOptions, table);
+            DatasetService datasetService = bqServices.getDatasetService(bqOptions);
+            verifyDatasetPresence(datasetService, table);
+            verifyTablePresence(datasetService, table);
           }
           if (query != null) {
-            dryRunQuery(bqOptions, query);
+            JobService jobService = bqServices.getJobService(bqOptions);
+            try {
+              jobService.dryRunQuery(bqOptions.getProject(), query);
+            } catch (Exception e) {
+              throw new IllegalArgumentException(
+                  String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
+            }
           }
         }
       }
 
-      private static void dryRunQuery(BigQueryOptions options, String query) {
-        Bigquery client = Transport.newBigQueryClient(options).build();
-        QueryRequest request = new QueryRequest();
-        request.setQuery(query);
-        request.setDryRun(true);
-
-        try {
-          BigQueryTableRowIterator.executeWithBackOff(
-              client.jobs().query(options.getProject(), request), QUERY_VALIDATION_FAILURE_ERROR,
-              query);
-        } catch (Exception e) {
-          throw new IllegalArgumentException(
-              String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
-        }
-      }
-
       @Override
       public PCollection<TableRow> apply(PInput input) {
         String uuid = randomUUIDString();
@@ -669,11 +659,10 @@ public class BigQueryIO {
       }
 
       private BigQueryServices getBigQueryServices() {
-        if (testBigQueryServices != null) {
-          return testBigQueryServices;
-        } else {
-          return new BigQueryServicesImpl();
+        if (bigQueryServices == null) {
+          bigQueryServices = new BigQueryServicesImpl();
         }
+        return bigQueryServices;
       }
     }
 
@@ -1443,8 +1432,7 @@ public class BigQueryIO {
       // An option to indicate if table validation is desired. Default is true.
       final boolean validate;
 
-      // A fake or mock BigQueryServices for tests.
-      @Nullable private BigQueryServices testBigQueryServices;
+      @Nullable private BigQueryServices bigQueryServices;
 
       private static class TranslateTableSpecFunction implements
           SerializableFunction<BoundedWindow, TableReference> {
@@ -1475,14 +1463,14 @@ public class BigQueryIO {
             CreateDisposition.CREATE_IF_NEEDED,
             WriteDisposition.WRITE_EMPTY,
             true /* validate */,
-            null /* testBigQueryServices */);
+            null /* bigQueryServices */);
       }
 
       private Bound(String name, @Nullable String jsonTableRef,
           @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
           @Nullable String jsonSchema,
           CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate,
-          @Nullable BigQueryServices testBigQueryServices) {
+          @Nullable BigQueryServices bigQueryServices) {
         super(name);
         this.jsonTableRef = jsonTableRef;
         this.tableRefFunction = tableRefFunction;
@@ -1490,7 +1478,7 @@ public class BigQueryIO {
         this.createDisposition = checkNotNull(createDisposition, "createDisposition");
         this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
         this.validate = validate;
-        this.testBigQueryServices = testBigQueryServices;
+        this.bigQueryServices = bigQueryServices;
       }
 
       /**
@@ -1510,7 +1498,7 @@ public class BigQueryIO {
        */
       public Bound to(TableReference table) {
         return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testBigQueryServices);
+            writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -1539,7 +1527,7 @@ public class BigQueryIO {
       public Bound toTableReference(
           SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testBigQueryServices);
+            writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -1550,7 +1538,7 @@ public class BigQueryIO {
        */
       public Bound withSchema(TableSchema schema) {
         return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
-            createDisposition, writeDisposition, validate, testBigQueryServices);
+            createDisposition, writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -1560,7 +1548,7 @@ public class BigQueryIO {
        */
       public Bound withCreateDisposition(CreateDisposition createDisposition) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testBigQueryServices);
+            writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -1570,7 +1558,7 @@ public class BigQueryIO {
        */
       public Bound withWriteDisposition(WriteDisposition writeDisposition) {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testBigQueryServices);
+            writeDisposition, validate, bigQueryServices);
       }
 
       /**
@@ -1580,7 +1568,7 @@ public class BigQueryIO {
        */
       public Bound withoutValidation() {
         return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, false, testBigQueryServices);
+            writeDisposition, false, bigQueryServices);
       }
 
       @VisibleForTesting
@@ -1590,18 +1578,18 @@ public class BigQueryIO {
       }
 
       private static void verifyTableEmpty(
-          BigQueryOptions options,
+          DatasetService datasetService,
           TableReference table) {
         try {
-          Bigquery client = Transport.newBigQueryClient(options).build();
-          BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
-          if (!inserter.isEmpty(table)) {
+          boolean isEmpty = datasetService.isTableEmpty(
+              table.getProjectId(), table.getDatasetId(), table.getTableId());
+          if (!isEmpty) {
             throw new IllegalArgumentException(
                 "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
           }
-        } catch (IOException e) {
+        } catch (IOException | InterruptedException e) {
           ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-          if (errorExtractor.itemNotFound(e)) {
+          if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
             // Nothing to do. If the table does not exist, it is considered empty.
           } else {
             throw new RuntimeException(
@@ -1633,16 +1621,17 @@ public class BigQueryIO {
         if (jsonTableRef != null && validate) {
           TableReference table = getTableWithDefaultProject(options);
 
+          DatasetService datasetService = getBigQueryServices().getDatasetService(options);
           // Check for destination table presence and emptiness for early failure notification.
           // Note that a presence check can fail when the table or dataset is created by an earlier
           // stage of the pipeline. For these cases the #withoutValidation method can be used to
           // disable the check.
-          verifyDatasetPresence(options, table);
+          verifyDatasetPresence(datasetService, table);
           if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-            verifyTablePresence(options, table);
+            verifyTablePresence(datasetService, table);
           }
           if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-            verifyTableEmpty(options, table);
+            verifyTableEmpty(datasetService, table);
           }
         }
 
@@ -1663,7 +1652,7 @@ public class BigQueryIO {
           checkArgument(
               !Strings.isNullOrEmpty(tempLocation),
               "BigQueryIO.Write needs a GCS temp location to store temp files.");
-          if (testBigQueryServices == null) {
+          if (bigQueryServices == null) {
             try {
               GcsPath.fromUri(tempLocation);
             } catch (IllegalArgumentException e) {
@@ -1789,11 +1778,10 @@ public class BigQueryIO {
       }
 
       private BigQueryServices getBigQueryServices() {
-        if (testBigQueryServices != null) {
-          return testBigQueryServices;
-        } else {
-          return new BigQueryServicesImpl();
+        if (bigQueryServices == null) {
+          bigQueryServices = new BigQueryServicesImpl();
         }
+        return bigQueryServices;
       }
     }
 
@@ -1985,12 +1973,9 @@ public class BigQueryIO {
     }
   }
 
-  private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) {
+  private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
     try {
-      Bigquery client = Transport.newBigQueryClient(options).build();
-      BigQueryTableRowIterator.executeWithBackOff(
-          client.datasets().get(table.getProjectId(), table.getDatasetId()),
-          RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table));
+      datasetService.getDataset(table.getProjectId(), table.getDatasetId());
     } catch (Exception e) {
       ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
@@ -2006,12 +1991,9 @@ public class BigQueryIO {
     }
   }
 
-  private static void verifyTablePresence(BigQueryOptions options, TableReference table) {
+  private static void verifyTablePresence(DatasetService datasetService, TableReference table) {
     try {
-      Bigquery client = Transport.newBigQueryClient(options).build();
-      BigQueryTableRowIterator.executeWithBackOff(
-          client.tables().get(table.getProjectId(), table.getDatasetId(), table.getTableId()),
-          RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table));
+      datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId());
     } catch (Exception e) {
       ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
index f82edf4..514e005 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
@@ -116,6 +116,18 @@ public interface BigQueryServices extends Serializable {
         throws IOException, InterruptedException;
 
     /**
+     * Returns true if the table is empty.
+     */
+    boolean isTableEmpty(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException;
+
+    /**
+     * Gets the specified {@link Dataset} resource by dataset ID.
+     */
+    Dataset getDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException;
+
+    /**
      * Create a {@link Dataset} with the given {@code location} and {@code description}.
      */
     void createDataset(String projectId, String datasetId, String location, String description)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
index 01ea45f..1aadeb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
@@ -36,6 +36,7 @@ import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataList;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
@@ -307,6 +308,42 @@ public class BigQueryServicesImpl implements BigQueryServices {
           backoff);
     }
 
+    @Override
+    public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      TableDataList dataList = executeWithRetries(
+          client.tabledata().list(projectId, datasetId, tableId),
+          String.format(
+              "Unable to list table data: %s, aborting after %d retries.",
+              tableId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+      return dataList.getRows() == null || dataList.getRows().isEmpty();
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public Dataset getDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      return executeWithRetries(
+          client.datasets().get(projectId, datasetId),
+          String.format(
+              "Unable to get dataset: %s, aborting after %d retries.",
+              datasetId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+
     /**
      * {@inheritDoc}
      *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index f0d3fce..43bf314 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -438,14 +438,22 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testValidateReadSetsDefaultProject() {
+  public void testValidateReadSetsDefaultProject() throws Exception {
+    String projectId = "someproject";
+    String datasetId = "somedataset";
     BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    options.setProject("someproject");
+    options.setProject(projectId);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService);
+    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
+        new RuntimeException("Unable to confirm BigQuery dataset presence"));
 
     Pipeline p = TestPipeline.create(options);
 
     TableReference tableRef = new TableReference();
-    tableRef.setDatasetId("somedataset");
+    tableRef.setDatasetId(datasetId);
     tableRef.setTableId("sometable");
 
     thrown.expect(RuntimeException.class);
@@ -453,7 +461,8 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for table")));
-    p.apply(BigQueryIO.Read.from(tableRef));
+    p.apply(BigQueryIO.Read.from(tableRef)
+        .withTestServices(fakeBqServices));
   }
 
   @Test
@@ -759,15 +768,24 @@ public class BigQueryIOTest implements Serializable {
     assertThat(displayData, hasDisplayItem("validation", false));
   }
 
-  private void testWriteValidatesDataset(boolean streaming) {
+  private void testWriteValidatesDataset(boolean streaming) throws Exception {
+    String projectId = "someproject";
+    String datasetId = "somedataset";
+
     BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    options.setProject("someproject");
+    options.setProject(projectId);
     options.setStreaming(streaming);
 
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService);
+    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
+        new RuntimeException("Unable to confirm BigQuery dataset presence"));
+
     Pipeline p = TestPipeline.create(options);
 
     TableReference tableRef = new TableReference();
-    tableRef.setDatasetId("somedataset");
+    tableRef.setDatasetId(datasetId);
     tableRef.setTableId("sometable");
 
     thrown.expect(RuntimeException.class);
@@ -779,16 +797,17 @@ public class BigQueryIOTest implements Serializable {
      .apply(BigQueryIO.Write
          .to(tableRef)
          .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-         .withSchema(new TableSchema()));
+         .withSchema(new TableSchema())
+         .withTestServices(fakeBqServices));
   }
 
   @Test
-  public void testWriteValidatesDatasetBatch() {
+  public void testWriteValidatesDatasetBatch() throws Exception {
     testWriteValidatesDataset(false);
   }
 
   @Test
-  public void testWriteValidatesDatasetStreaming() {
+  public void testWriteValidatesDatasetStreaming() throws Exception {
     testWriteValidatesDataset(true);
   }
 


[2/2] incubator-beam git commit: Closes #555

Posted by dh...@apache.org.
Closes #555


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df1d5f61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df1d5f61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df1d5f61

Branch: refs/heads/master
Commit: df1d5f61e1a07ea123c23ec00f9c36c2f1843073
Parents: e7418e2 336b516
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 30 21:34:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 30 21:34:27 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 118 ++++++++-----------
 .../apache/beam/sdk/util/BigQueryServices.java  |  12 ++
 .../beam/sdk/util/BigQueryServicesImpl.java     |  37 ++++++
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  39 ++++--
 4 files changed, 128 insertions(+), 78 deletions(-)
----------------------------------------------------------------------