You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 20:23:27 UTC

[19/50] incubator-beam git commit: BigQueryIO: fix streaming write, typo in API

BigQueryIO: fix streaming write, typo in API

and improve testing


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5fb4f5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5fb4f5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5fb4f5de

Branch: refs/heads/gearpump-runner
Commit: 5fb4f5de9515db717818f1e3ffd7ca3c6eba5614
Parents: 4206408
Author: Sam McVeety <sg...@google.com>
Authored: Fri Dec 16 18:10:28 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Dec 16 23:53:49 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 23 +++++--
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 72 ++++++++++++--------
 2 files changed, 63 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fb4f5de/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 28049ed..7bb1e51 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -370,7 +370,8 @@ public class BigQueryIO {
     }
   }
 
-  private static class TableSpecToTableRef
+  @VisibleForTesting
+  static class TableSpecToTableRef
       implements SerializableFunction<String, TableReference> {
     @Override
     public TableReference apply(String from) {
@@ -807,6 +808,7 @@ public class BigQueryIO {
       /**
        * Returns the query to be read, or {@code null} if reading from a table instead.
        */
+      @Nullable
       public String getQuery() {
         return query == null ? null : query.get();
       }
@@ -814,7 +816,8 @@ public class BigQueryIO {
       /**
        * Returns the query to be read, or {@code null} if reading from a table instead.
        */
-      public ValueProvider<String> getQueryProivder() {
+      @Nullable
+      public ValueProvider<String> getQueryProvider() {
         return query;
       }
 
@@ -2813,7 +2816,8 @@ public class BigQueryIO {
    * a randomUUID is generated only once per bucket of data. The actual unique
    * id is created by concatenating this randomUUID with a sequential number.
    */
-  private static class TagWithUniqueIdsAndTable
+  @VisibleForTesting
+  static class TagWithUniqueIdsAndTable
       extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
     /** TableSpec to write to. */
     private final ValueProvider<String> tableSpec;
@@ -2830,8 +2834,12 @@ public class BigQueryIO {
       checkArgument(table == null ^ tableRefFunction == null,
           "Exactly one of table or tableRefFunction should be set");
       if (table != null) {
-        if (table.isAccessible() && table.get().getProjectId() == null) {
-          table.get().setProjectId(options.as(BigQueryOptions.class).getProject());
+        if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
+          TableReference tableRef = table.get()
+              .setProjectId(options.as(BigQueryOptions.class).getProject());
+          table = NestedValueProvider.of(
+              StaticValueProvider.of(toJsonString(tableRef)),
+              new JsonTableRefToTableRef());
         }
         this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
       } else {
@@ -2870,6 +2878,11 @@ public class BigQueryIO {
       }
     }
 
+    @VisibleForTesting
+    ValueProvider<String> getTableSpec() {
+      return tableSpec;
+    }
+
     private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
       if (tableSpec != null) {
         return tableSpec.get();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5fb4f5de/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index b78316f..dc566d2 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -2242,43 +2243,60 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testRuntimeOptionsNotCalledInApplyInputTable() throws IOException {
+  public void testRuntimeOptionsNotCalledInApplyInputTable() {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService());
+    bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    pipeline
-        .apply(BigQueryIO.Read
-            .from(options.getInputTable()).withoutValidation()
-            .withTestServices(fakeBqServices))
-            .apply(BigQueryIO.Write
-            .to(options.getOutputTable())
-            .withSchema(NestedValueProvider.of(
-                options.getOutputSchema(), new JsonSchemaToTableSchema()))
-            .withTestServices(fakeBqServices)
-            .withoutValidation());
+    BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+        options.getInputTable()).withoutValidation();
+    pipeline.apply(read);
+    // Test that this doesn't throw.
+    DisplayData.from(read);
   }
 
   @Test
-  public void testRuntimeOptionsNotCalledInApplyInputQuery() throws IOException {
+  public void testRuntimeOptionsNotCalledInApplyInputQuery() {
     RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService());
+    bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
+    BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+        options.getInputQuery()).withoutValidation();
+    pipeline.apply(read);
+    // Test that this doesn't throw.
+    DisplayData.from(read);
+  }
+
+  @Test
+  public void testRuntimeOptionsNotCalledInApplyOutput() {
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+    Pipeline pipeline = TestPipeline.create(options);
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to(options.getOutputTable())
+        .withSchema(NestedValueProvider.of(
+            options.getOutputSchema(), new JsonSchemaToTableSchema()))
+        .withoutValidation();
     pipeline
-        .apply(BigQueryIO.Read
-            .fromQuery(options.getInputQuery()).withoutValidation()
-            .withTestServices(fakeBqServices))
-            .apply(BigQueryIO.Write
-            .to(options.getOutputTable())
-            .withSchema(NestedValueProvider.of(
-                options.getOutputSchema(), new JsonSchemaToTableSchema()))
-            .withTestServices(fakeBqServices)
-            .withoutValidation());
+        .apply(Create.<TableRow>of())
+        .apply(write);
+    // Test that this doesn't throw.
+    DisplayData.from(write);
+  }
+
+  @Test
+  public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
+    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
+    bqOptions.setProject("project");
+    BigQueryIO.TagWithUniqueIdsAndTable tag =
+        new BigQueryIO.TagWithUniqueIdsAndTable(
+            bqOptions, NestedValueProvider.of(
+                StaticValueProvider.of("data_set.table_name"),
+                new BigQueryIO.TableSpecToTableRef()), null);
+    TableReference table = BigQueryIO.parseTableSpec(tag.getTableSpec().get());
+    assertNotNull(table.getProjectId());
   }
 
   private static void testNumFiles(File tempDir, int expectedNumFiles) {