You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/01 04:34:32 UTC
[1/2] incubator-beam git commit: Replacing BigQuery direct calls with
BigQueryServices abstraction
Repository: incubator-beam
Updated Branches:
refs/heads/master e7418e2e8 -> df1d5f61e
Replacing BigQuery direct calls with BigQueryServices abstraction
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/336b5160
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/336b5160
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/336b5160
Branch: refs/heads/master
Commit: 336b5160f13dfac29e625b7c9f42b92a65041461
Parents: e7418e2
Author: Pei He <pe...@google.com>
Authored: Tue Jun 28 17:49:52 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 30 21:34:26 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 118 ++++++++-----------
.../apache/beam/sdk/util/BigQueryServices.java | 12 ++
.../beam/sdk/util/BigQueryServicesImpl.java | 37 ++++++
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 39 ++++--
4 files changed, 128 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 790e3ff..7955022 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -54,7 +54,6 @@ import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
import org.apache.beam.sdk.util.BigQueryServices.JobService;
import org.apache.beam.sdk.util.BigQueryServicesImpl;
import org.apache.beam.sdk.util.BigQueryTableInserter;
-import org.apache.beam.sdk.util.BigQueryTableRowIterator;
import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
@@ -85,7 +84,6 @@ import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
@@ -386,7 +384,7 @@ public class BigQueryIO {
*/
final boolean validate;
@Nullable final Boolean flattenResults;
- @Nullable final BigQueryServices testBigQueryServices;
+ @Nullable BigQueryServices bigQueryServices;
private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
@@ -403,18 +401,18 @@ public class BigQueryIO {
null /* jsonTableRef */,
true /* validate */,
null /* flattenResults */,
- null /* testBigQueryServices */);
+ null /* bigQueryServices */);
}
private Bound(
String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate,
- @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) {
+ @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) {
super(name);
this.jsonTableRef = jsonTableRef;
this.query = query;
this.validate = validate;
this.flattenResults = flattenResults;
- this.testBigQueryServices = testBigQueryServices;
+ this.bigQueryServices = bigQueryServices;
}
/**
@@ -434,7 +432,7 @@ public class BigQueryIO {
*/
public Bound from(TableReference table) {
return new Bound(
- name, query, toJsonString(table), validate, flattenResults, testBigQueryServices);
+ name, query, toJsonString(table), validate, flattenResults, bigQueryServices);
}
/**
@@ -449,7 +447,7 @@ public class BigQueryIO {
*/
public Bound fromQuery(String query) {
return new Bound(name, query, jsonTableRef, validate,
- MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices);
+ MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), bigQueryServices);
}
/**
@@ -458,7 +456,7 @@ public class BigQueryIO {
* occurs.
*/
public Bound withoutValidation() {
- return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices);
+ return new Bound(name, query, jsonTableRef, false, flattenResults, bigQueryServices);
}
/**
@@ -469,7 +467,7 @@ public class BigQueryIO {
* from a table will cause an error during validation.
*/
public Bound withoutResultFlattening() {
- return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices);
+ return new Bound(name, query, jsonTableRef, validate, false, bigQueryServices);
}
@VisibleForTesting
@@ -499,36 +497,28 @@ public class BigQueryIO {
}
if (validate) {
+ BigQueryServices bqServices = getBigQueryServices();
// Check for source table/query presence for early failure notification.
// Note that a presence check can fail if the table or dataset are created by earlier
// stages of the pipeline or if a query depends on earlier stages of a pipeline. For these
// cases the withoutValidation method can be used to disable the check.
if (table != null) {
- verifyDatasetPresence(bqOptions, table);
- verifyTablePresence(bqOptions, table);
+ DatasetService datasetService = bqServices.getDatasetService(bqOptions);
+ verifyDatasetPresence(datasetService, table);
+ verifyTablePresence(datasetService, table);
}
if (query != null) {
- dryRunQuery(bqOptions, query);
+ JobService jobService = bqServices.getJobService(bqOptions);
+ try {
+ jobService.dryRunQuery(bqOptions.getProject(), query);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
+ }
}
}
}
- private static void dryRunQuery(BigQueryOptions options, String query) {
- Bigquery client = Transport.newBigQueryClient(options).build();
- QueryRequest request = new QueryRequest();
- request.setQuery(query);
- request.setDryRun(true);
-
- try {
- BigQueryTableRowIterator.executeWithBackOff(
- client.jobs().query(options.getProject(), request), QUERY_VALIDATION_FAILURE_ERROR,
- query);
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
- }
- }
-
@Override
public PCollection<TableRow> apply(PInput input) {
String uuid = randomUUIDString();
@@ -669,11 +659,10 @@ public class BigQueryIO {
}
private BigQueryServices getBigQueryServices() {
- if (testBigQueryServices != null) {
- return testBigQueryServices;
- } else {
- return new BigQueryServicesImpl();
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
}
+ return bigQueryServices;
}
}
@@ -1443,8 +1432,7 @@ public class BigQueryIO {
// An option to indicate if table validation is desired. Default is true.
final boolean validate;
- // A fake or mock BigQueryServices for tests.
- @Nullable private BigQueryServices testBigQueryServices;
+ @Nullable private BigQueryServices bigQueryServices;
private static class TranslateTableSpecFunction implements
SerializableFunction<BoundedWindow, TableReference> {
@@ -1475,14 +1463,14 @@ public class BigQueryIO {
CreateDisposition.CREATE_IF_NEEDED,
WriteDisposition.WRITE_EMPTY,
true /* validate */,
- null /* testBigQueryServices */);
+ null /* bigQueryServices */);
}
private Bound(String name, @Nullable String jsonTableRef,
@Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
@Nullable String jsonSchema,
CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate,
- @Nullable BigQueryServices testBigQueryServices) {
+ @Nullable BigQueryServices bigQueryServices) {
super(name);
this.jsonTableRef = jsonTableRef;
this.tableRefFunction = tableRefFunction;
@@ -1490,7 +1478,7 @@ public class BigQueryIO {
this.createDisposition = checkNotNull(createDisposition, "createDisposition");
this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
this.validate = validate;
- this.testBigQueryServices = testBigQueryServices;
+ this.bigQueryServices = bigQueryServices;
}
/**
@@ -1510,7 +1498,7 @@ public class BigQueryIO {
*/
public Bound to(TableReference table) {
return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, testBigQueryServices);
+ writeDisposition, validate, bigQueryServices);
}
/**
@@ -1539,7 +1527,7 @@ public class BigQueryIO {
public Bound toTableReference(
SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, testBigQueryServices);
+ writeDisposition, validate, bigQueryServices);
}
/**
@@ -1550,7 +1538,7 @@ public class BigQueryIO {
*/
public Bound withSchema(TableSchema schema) {
return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
- createDisposition, writeDisposition, validate, testBigQueryServices);
+ createDisposition, writeDisposition, validate, bigQueryServices);
}
/**
@@ -1560,7 +1548,7 @@ public class BigQueryIO {
*/
public Bound withCreateDisposition(CreateDisposition createDisposition) {
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, testBigQueryServices);
+ writeDisposition, validate, bigQueryServices);
}
/**
@@ -1570,7 +1558,7 @@ public class BigQueryIO {
*/
public Bound withWriteDisposition(WriteDisposition writeDisposition) {
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, validate, testBigQueryServices);
+ writeDisposition, validate, bigQueryServices);
}
/**
@@ -1580,7 +1568,7 @@ public class BigQueryIO {
*/
public Bound withoutValidation() {
return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
- writeDisposition, false, testBigQueryServices);
+ writeDisposition, false, bigQueryServices);
}
@VisibleForTesting
@@ -1590,18 +1578,18 @@ public class BigQueryIO {
}
private static void verifyTableEmpty(
- BigQueryOptions options,
+ DatasetService datasetService,
TableReference table) {
try {
- Bigquery client = Transport.newBigQueryClient(options).build();
- BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
- if (!inserter.isEmpty(table)) {
+ boolean isEmpty = datasetService.isTableEmpty(
+ table.getProjectId(), table.getDatasetId(), table.getTableId());
+ if (!isEmpty) {
throw new IllegalArgumentException(
"BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
}
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
- if (errorExtractor.itemNotFound(e)) {
+ if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
// Nothing to do. If the table does not exist, it is considered empty.
} else {
throw new RuntimeException(
@@ -1633,16 +1621,17 @@ public class BigQueryIO {
if (jsonTableRef != null && validate) {
TableReference table = getTableWithDefaultProject(options);
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
// disable the check.
- verifyDatasetPresence(options, table);
+ verifyDatasetPresence(datasetService, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
- verifyTablePresence(options, table);
+ verifyTablePresence(datasetService, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
- verifyTableEmpty(options, table);
+ verifyTableEmpty(datasetService, table);
}
}
@@ -1663,7 +1652,7 @@ public class BigQueryIO {
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
- if (testBigQueryServices == null) {
+ if (bigQueryServices == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
@@ -1789,11 +1778,10 @@ public class BigQueryIO {
}
private BigQueryServices getBigQueryServices() {
- if (testBigQueryServices != null) {
- return testBigQueryServices;
- } else {
- return new BigQueryServicesImpl();
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
}
+ return bigQueryServices;
}
}
@@ -1985,12 +1973,9 @@ public class BigQueryIO {
}
}
- private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) {
+ private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
try {
- Bigquery client = Transport.newBigQueryClient(options).build();
- BigQueryTableRowIterator.executeWithBackOff(
- client.datasets().get(table.getProjectId(), table.getDatasetId()),
- RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table));
+ datasetService.getDataset(table.getProjectId(), table.getDatasetId());
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
@@ -2006,12 +1991,9 @@ public class BigQueryIO {
}
}
- private static void verifyTablePresence(BigQueryOptions options, TableReference table) {
+ private static void verifyTablePresence(DatasetService datasetService, TableReference table) {
try {
- Bigquery client = Transport.newBigQueryClient(options).build();
- BigQueryTableRowIterator.executeWithBackOff(
- client.tables().get(table.getProjectId(), table.getDatasetId(), table.getTableId()),
- RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table));
+ datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId());
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
index f82edf4..514e005 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
@@ -116,6 +116,18 @@ public interface BigQueryServices extends Serializable {
throws IOException, InterruptedException;
/**
+ * Returns true if the table is empty.
+ */
+ boolean isTableEmpty(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException;
+
+ /**
+ * Gets the specified {@link Dataset} resource by dataset ID.
+ */
+ Dataset getDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException;
+
+ /**
* Create a {@link Dataset} with the given {@code location} and {@code description}.
*/
void createDataset(String projectId, String datasetId, String location, String description)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
index 01ea45f..1aadeb2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
@@ -36,6 +36,7 @@ import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataList;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
@@ -307,6 +308,42 @@ public class BigQueryServicesImpl implements BigQueryServices {
backoff);
}
+ @Override
+ public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ TableDataList dataList = executeWithRetries(
+ client.tabledata().list(projectId, datasetId, tableId),
+ String.format(
+ "Unable to list table data: %s, aborting after %d retries.",
+ tableId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ return dataList.getRows() == null || dataList.getRows().isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public Dataset getDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ return executeWithRetries(
+ client.datasets().get(projectId, datasetId),
+ String.format(
+ "Unable to get dataset: %s, aborting after %d retries.",
+ datasetId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+
/**
* {@inheritDoc}
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/336b5160/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index f0d3fce..43bf314 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -438,14 +438,22 @@ public class BigQueryIOTest implements Serializable {
}
@Test
- public void testValidateReadSetsDefaultProject() {
+ public void testValidateReadSetsDefaultProject() throws Exception {
+ String projectId = "someproject";
+ String datasetId = "somedataset";
BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
- options.setProject("someproject");
+ options.setProject(projectId);
+
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(mockJobService)
+ .withDatasetService(mockDatasetService);
+ when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
+ new RuntimeException("Unable to confirm BigQuery dataset presence"));
Pipeline p = TestPipeline.create(options);
TableReference tableRef = new TableReference();
- tableRef.setDatasetId("somedataset");
+ tableRef.setDatasetId(datasetId);
tableRef.setTableId("sometable");
thrown.expect(RuntimeException.class);
@@ -453,7 +461,8 @@ public class BigQueryIOTest implements Serializable {
thrown.expectMessage(
Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
.or(Matchers.containsString("BigQuery dataset not found for table")));
- p.apply(BigQueryIO.Read.from(tableRef));
+ p.apply(BigQueryIO.Read.from(tableRef)
+ .withTestServices(fakeBqServices));
}
@Test
@@ -759,15 +768,24 @@ public class BigQueryIOTest implements Serializable {
assertThat(displayData, hasDisplayItem("validation", false));
}
- private void testWriteValidatesDataset(boolean streaming) {
+ private void testWriteValidatesDataset(boolean streaming) throws Exception {
+ String projectId = "someproject";
+ String datasetId = "somedataset";
+
BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
- options.setProject("someproject");
+ options.setProject(projectId);
options.setStreaming(streaming);
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(mockJobService)
+ .withDatasetService(mockDatasetService);
+ when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
+ new RuntimeException("Unable to confirm BigQuery dataset presence"));
+
Pipeline p = TestPipeline.create(options);
TableReference tableRef = new TableReference();
- tableRef.setDatasetId("somedataset");
+ tableRef.setDatasetId(datasetId);
tableRef.setTableId("sometable");
thrown.expect(RuntimeException.class);
@@ -779,16 +797,17 @@ public class BigQueryIOTest implements Serializable {
.apply(BigQueryIO.Write
.to(tableRef)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
- .withSchema(new TableSchema()));
+ .withSchema(new TableSchema())
+ .withTestServices(fakeBqServices));
}
@Test
- public void testWriteValidatesDatasetBatch() {
+ public void testWriteValidatesDatasetBatch() throws Exception {
testWriteValidatesDataset(false);
}
@Test
- public void testWriteValidatesDatasetStreaming() {
+ public void testWriteValidatesDatasetStreaming() throws Exception {
testWriteValidatesDataset(true);
}
[2/2] incubator-beam git commit: Closes #555
Posted by dh...@apache.org.
Closes #555
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/df1d5f61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/df1d5f61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/df1d5f61
Branch: refs/heads/master
Commit: df1d5f61e1a07ea123c23ec00f9c36c2f1843073
Parents: e7418e2 336b516
Author: Dan Halperin <dh...@google.com>
Authored: Thu Jun 30 21:34:27 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Jun 30 21:34:27 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 118 ++++++++-----------
.../apache/beam/sdk/util/BigQueryServices.java | 12 ++
.../beam/sdk/util/BigQueryServicesImpl.java | 37 ++++++
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 39 ++++--
4 files changed, 128 insertions(+), 78 deletions(-)
----------------------------------------------------------------------