You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/06 22:44:49 UTC

[3/4] incubator-beam git commit: Forward port Dataflow PR-453 to Beam

Forward port Dataflow PR-453 to Beam


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f33e869
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f33e869
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f33e869

Branch: refs/heads/master
Commit: 9f33e8692fe4852b1825c5464eafa8d9e9786425
Parents: 0ac0caf
Author: Pei He <pe...@google.com>
Authored: Mon Oct 3 21:05:44 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu Oct 6 15:33:04 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 24 +++---
 .../sdk/io/gcp/bigquery/BigQueryServices.java   |  3 +-
 .../io/gcp/bigquery/BigQueryServicesImpl.java   | 19 +++--
 .../gcp/bigquery/BigQueryTableRowIterator.java  |  2 +-
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 83 ++++++++++++++++++++
 5 files changed, 112 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index eb98ea8..716fe39 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -528,6 +528,7 @@ public class BigQueryIO {
         checkState(
             table != null || query != null,
             "Invalid BigQueryIO.Read: one of table reference and query must be set");
+
         if (table != null) {
           checkState(
               flattenResults == null,
@@ -910,21 +911,26 @@ public class BigQueryIO {
     protected TableReference getTableToExtract(BigQueryOptions bqOptions)
         throws IOException, InterruptedException {
       // 1. Find the location of the query.
-      TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
-          .getQuery()
-          .getReferencedTables()
-          .get(0);
+      String location = null;
+      List<TableReference> referencedTables =
+          dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
       DatasetService tableService = bqServices.getDatasetService(bqOptions);
-      String location = tableService.getTable(
-          dryRunTempTable.getProjectId(),
-          dryRunTempTable.getDatasetId(),
-          dryRunTempTable.getTableId()).getLocation();
+      if (referencedTables != null && !referencedTables.isEmpty()) {
+        TableReference queryTable = referencedTables.get(0);
+        location = tableService.getTable(
+            queryTable.getProjectId(),
+            queryTable.getDatasetId(),
+            queryTable.getTableId()).getLocation();
+      }
 
       // 2. Create the temporary dataset in the query location.
       TableReference tableToExtract =
           JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
       tableService.createDataset(
-          tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
+          tableToExtract.getProjectId(),
+          tableToExtract.getDatasetId(),
+          location,
+          "Dataset for BigQuery query job temporary table");
 
       // 3. Execute the query.
       String queryJobId = jobIdToken + "-query";

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index 1d9fb28..ca7e491 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -141,7 +141,8 @@ interface BigQueryServices extends Serializable {
     /**
      * Create a {@link Dataset} with the given {@code location} and {@code description}.
      */
-    void createDataset(String projectId, String datasetId, String location, String description)
+    void createDataset(
+        String projectId, String datasetId, @Nullable String location, @Nullable String description)
         throws IOException, InterruptedException;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 3e057bb..61f1a1a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -472,7 +472,7 @@ class BigQueryServicesImpl implements BigQueryServices {
      */
     @Override
     public void createDataset(
-        String projectId, String datasetId, String location, String description)
+        String projectId, String datasetId, @Nullable String location, @Nullable String description)
         throws IOException, InterruptedException {
       BackOff backoff =
           FluentBackoff.DEFAULT
@@ -483,19 +483,22 @@ class BigQueryServicesImpl implements BigQueryServices {
     private void createDataset(
         String projectId,
         String datasetId,
-        String location,
-        String description,
+        @Nullable String location,
+        @Nullable String description,
         Sleeper sleeper,
         BackOff backoff) throws IOException, InterruptedException {
       DatasetReference datasetRef = new DatasetReference()
           .setProjectId(projectId)
           .setDatasetId(datasetId);
 
-      Dataset dataset = new Dataset()
-          .setDatasetReference(datasetRef)
-          .setLocation(location)
-          .setFriendlyName(location)
-          .setDescription(description);
+      Dataset dataset = new Dataset().setDatasetReference(datasetRef);
+      if (location != null) {
+        dataset.setLocation(location);
+      }
+      if (description != null) {
+        dataset.setFriendlyName(description);
+        dataset.setDescription(description);
+      }
 
       Exception lastException;
       do {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
index 608995a..92f7542 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -421,7 +421,7 @@ class BigQueryTableRowIterator implements AutoCloseable {
     queryConfig.setQuery(query);
     queryConfig.setAllowLargeResults(true);
     queryConfig.setFlattenResults(flattenResults);
-    queryConfig.setFlattenResults(useLegacySql);
+    queryConfig.setUseLegacySql(useLegacySql);
 
     TableReference destinationTable = new TableReference();
     destinationTable.setProjectId(projectId);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f33e869/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index ab9716e..05a7c5c 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1271,6 +1271,89 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testBigQueryNoTableQuerySourceInitSplit() throws Exception {
+    TableReference dryRunTable = new TableReference();
+
+    Job queryJob = new Job();
+    JobStatistics queryJobStats = new JobStatistics();
+    JobStatistics2 queryStats = new JobStatistics2();
+    queryStats.setReferencedTables(ImmutableList.of(dryRunTable));
+    queryJobStats.setQuery(queryStats);
+    queryJob.setStatus(new JobStatus())
+        .setStatistics(queryJobStats);
+
+    Job extractJob = new Job();
+    JobStatistics extractJobStats = new JobStatistics();
+    JobStatistics4 extractStats = new JobStatistics4();
+    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
+    extractJobStats.setExtract(extractStats);
+    extractJob.setStatus(new JobStatus())
+        .setStatistics(extractJobStats);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    String extractDestinationDir = "mock://tempLocation";
+    TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+    BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
+        jobIdToken, "query", destinationTable, true /* flattenResults */, true /* useLegacySql */,
+        extractDestinationDir, fakeBqServices);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation(extractDestinationDir);
+
+    when(mockJobService.dryRunQuery(anyString(), Mockito.<JobConfigurationQuery>any()))
+        .thenReturn(new JobStatistics().setQuery(
+            new JobStatistics2()
+                .setTotalBytesProcessed(100L)));
+    when(mockDatasetService.getTable(
+        eq(destinationTable.getProjectId()),
+        eq(destinationTable.getDatasetId()),
+        eq(destinationTable.getTableId())))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    when(mockIOChannelFactory.resolve(anyString(), anyString()))
+        .thenReturn("mock://tempLocation/output");
+    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+        .thenReturn(extractJob);
+
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
+    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+    assertEquals(1, sources.size());
+    BoundedSource<TableRow> actual = sources.get(0);
+    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+    Mockito.verify(mockJobService)
+        .startQueryJob(
+            Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
+    Mockito.verify(mockJobService)
+        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
+    Mockito.verify(mockDatasetService)
+        .createDataset(anyString(), anyString(), anyString(), anyString());
+    ArgumentCaptor<JobConfigurationQuery> queryConfigArg =
+        ArgumentCaptor.forClass(JobConfigurationQuery.class);
+    Mockito.verify(mockJobService).dryRunQuery(anyString(), queryConfigArg.capture());
+    assertEquals(true, queryConfigArg.getValue().getFlattenResults());
+    assertEquals(true, queryConfigArg.getValue().getUseLegacySql());
+  }
+
+  @Test
   public void testTransformingSource() throws Exception {
     int numElements = 10000;
     @SuppressWarnings("deprecation")