You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/06 22:44:49 UTC
[3/4] incubator-beam git commit: Forward port Dataflow PR-453 to Beam
Forward port Dataflow PR-453 to Beam
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f33e869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f33e869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f33e869
Branch: refs/heads/master
Commit: 9f33e8692fe4852b1825c5464eafa8d9e9786425
Parents: 0ac0caf
Author: Pei He <pe...@google.com>
Authored: Mon Oct 3 21:05:44 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:33:04 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 24 +++---
.../sdk/io/gcp/bigquery/BigQueryServices.java | 3 +-
.../io/gcp/bigquery/BigQueryServicesImpl.java | 19 +++--
.../gcp/bigquery/BigQueryTableRowIterator.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 83 ++++++++++++++++++++
5 files changed, 112 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index eb98ea8..716fe39 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -528,6 +528,7 @@ public class BigQueryIO {
checkState(
table != null || query != null,
"Invalid BigQueryIO.Read: one of table reference and query must be set");
+
if (table != null) {
checkState(
flattenResults == null,
@@ -910,21 +911,26 @@ public class BigQueryIO {
protected TableReference getTableToExtract(BigQueryOptions bqOptions)
throws IOException, InterruptedException {
// 1. Find the location of the query.
- TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
- .getQuery()
- .getReferencedTables()
- .get(0);
+ String location = null;
+ List<TableReference> referencedTables =
+ dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
DatasetService tableService = bqServices.getDatasetService(bqOptions);
- String location = tableService.getTable(
- dryRunTempTable.getProjectId(),
- dryRunTempTable.getDatasetId(),
- dryRunTempTable.getTableId()).getLocation();
+ if (referencedTables != null && !referencedTables.isEmpty()) {
+ TableReference queryTable = referencedTables.get(0);
+ location = tableService.getTable(
+ queryTable.getProjectId(),
+ queryTable.getDatasetId(),
+ queryTable.getTableId()).getLocation();
+ }
// 2. Create the temporary dataset in the query location.
TableReference tableToExtract =
JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
tableService.createDataset(
- tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
+ tableToExtract.getProjectId(),
+ tableToExtract.getDatasetId(),
+ location,
+ "Dataset for BigQuery query job temporary table");
// 3. Execute the query.
String queryJobId = jobIdToken + "-query";
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 1d9fb28..ca7e491 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -141,7 +141,8 @@ interface BigQueryServices extends Serializable {
/**
* Create a {@link Dataset} with the given {@code location} and {@code description}.
*/
- void createDataset(String projectId, String datasetId, String location, String description)
+ void createDataset(
+ String projectId, String datasetId, @Nullable String location, @Nullable String description)
throws IOException, InterruptedException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3e057bb..61f1a1a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -472,7 +472,7 @@ class BigQueryServicesImpl implements BigQueryServices {
*/
@Override
public void createDataset(
- String projectId, String datasetId, String location, String description)
+ String projectId, String datasetId, @Nullable String location, @Nullable String description)
throws IOException, InterruptedException {
BackOff backoff =
FluentBackoff.DEFAULT
@@ -483,19 +483,22 @@ class BigQueryServicesImpl implements BigQueryServices {
private void createDataset(
String projectId,
String datasetId,
- String location,
- String description,
+ @Nullable String location,
+ @Nullable String description,
Sleeper sleeper,
BackOff backoff) throws IOException, InterruptedException {
DatasetReference datasetRef = new DatasetReference()
.setProjectId(projectId)
.setDatasetId(datasetId);
- Dataset dataset = new Dataset()
- .setDatasetReference(datasetRef)
- .setLocation(location)
- .setFriendlyName(location)
- .setDescription(description);
+ Dataset dataset = new Dataset().setDatasetReference(datasetRef);
+ if (location != null) {
+ dataset.setLocation(location);
+ }
+ if (description != null) {
+ dataset.setFriendlyName(description);
+ dataset.setDescription(description);
+ }
Exception lastException;
do {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 608995a..92f7542 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -421,7 +421,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
queryConfig.setQuery(query);
queryConfig.setAllowLargeResults(true);
queryConfig.setFlattenResults(flattenResults);
- queryConfig.setFlattenResults(useLegacySql);
+ queryConfig.setUseLegacySql(useLegacySql);
TableReference destinationTable = new TableReference();
destinationTable.setProjectId(projectId);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index ab9716e..05a7c5c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1271,6 +1271,89 @@ public class BigQueryIOTest implements Serializable {
}
@Test
+ public void testBigQueryNoTableQuerySourceInitSplit() throws Exception {
+ TableReference dryRunTable = new TableReference();
+
+ Job queryJob = new Job();
+ JobStatistics queryJobStats = new JobStatistics();
+ JobStatistics2 queryStats = new JobStatistics2();
+ queryStats.setReferencedTables(ImmutableList.of(dryRunTable));
+ queryJobStats.setQuery(queryStats);
+ queryJob.setStatus(new JobStatus())
+ .setStatistics(queryJobStats);
+
+ Job extractJob = new Job();
+ JobStatistics extractJobStats = new JobStatistics();
+ JobStatistics4 extractStats = new JobStatistics4();
+ extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
+ extractJobStats.setExtract(extractStats);
+ extractJob.setStatus(new JobStatus())
+ .setStatistics(extractJobStats);
+
+ FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+ .withJobService(mockJobService)
+ .withDatasetService(mockDatasetService)
+ .readerReturns(
+ toJsonString(new TableRow().set("name", "a").set("number", "1")),
+ toJsonString(new TableRow().set("name", "b").set("number", "2")),
+ toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+ String jobIdToken = "testJobIdToken";
+ String extractDestinationDir = "mock://tempLocation";
+ TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+ BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
+ jobIdToken, "query", destinationTable, true /* flattenResults */, true /* useLegacySql */,
+ extractDestinationDir, fakeBqServices);
+
+ List<TableRow> expected = ImmutableList.of(
+ new TableRow().set("name", "a").set("number", "1"),
+ new TableRow().set("name", "b").set("number", "2"),
+ new TableRow().set("name", "c").set("number", "3"));
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options.setTempLocation(extractDestinationDir);
+
+ when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
+ .thenReturn(new JobStatistics().setQuery(
+ new JobStatistics2()
+ .setTotalBytesProcessed(100L)));
+ when(mockDatasetService.getTable(
+ eq(destinationTable.getProjectId()),
+ eq(destinationTable.getDatasetId()),
+ eq(destinationTable.getTableId())))
+ .thenReturn(new Table().setSchema(new TableSchema()));
+ IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+ when(mockIOChannelFactory.resolve(anyString(), anyString()))
+ .thenReturn("mock://tempLocation/output");
+ when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+ .thenReturn(extractJob);
+
+ Assert.assertThat(
+ SourceTestUtils.readFromSource(bqSource, options),
+ CoreMatchers.is(expected));
+ SourceTestUtils.assertSplitAtFractionBehavior(
+ bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
+ List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+ assertEquals(1, sources.size());
+ BoundedSource<TableRow> actual = sources.get(0);
+ assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+ Mockito.verify(mockJobService)
+ .startQueryJob(
+ Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
+ Mockito.verify(mockJobService)
+ .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
+ Mockito.verify(mockDatasetService)
+ .createDataset(anyString(), anyString(), anyString(), anyString());
+ ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
+ ArgumentCaptor.forClass(JobConfigurationQuery.class);
+ Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
+ assertEquals(true, queryConfigArg.getValue().getFlattenResults());
+ assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
+ }
+
+ @Test
public void testTransformingSource() throws Exception {
int numElements = 10000;
@SuppressWarnings("deprecation")