You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/01/29 16:21:54 UTC

[08/50] beam git commit: [BEAM-1071] Allow for BigQueryIO to write tables with CREATE_NEVER disposition

[BEAM-1071] Allow for BigQueryIO to write tables with CREATE_NEVER disposition


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc369522
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc369522
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc369522

Branch: refs/heads/python-sdk
Commit: dc369522d1cfa46ae9058919d93229de05db2b6a
Parents: 11c3cd7
Author: Sam McVeety <sg...@google.com>
Authored: Mon Dec 12 18:47:20 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Tue Jan 24 14:41:39 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 51 ++++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 36 ++++++++++++++
 2 files changed, 71 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index aff199a..fa49f55 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1925,10 +1925,17 @@ public class BigQueryIO {
 
         if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
           // We will use BigQuery's streaming write API -- validate supported dispositions.
-          checkArgument(
-              createDisposition != CreateDisposition.CREATE_NEVER,
-              "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when"
-                  + " using a tablespec function.");
+          if (tableRefFunction != null) {
+            checkArgument(
+                createDisposition != CreateDisposition.CREATE_NEVER,
+                "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+                + " function.");
+          }
+          if (jsonSchema == null) {
+            checkArgument(
+                createDisposition == CreateDisposition.CREATE_NEVER,
+                "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
+          }
 
           checkArgument(
               writeDisposition != WriteDisposition.WRITE_TRUNCATE,
@@ -1965,7 +1972,9 @@ public class BigQueryIO {
         if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
           return input.apply(
               new StreamWithDeDup(getTable(), tableRefFunction,
-                  NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices));
+                  jsonSchema == null ? null : NestedValueProvider.of(
+                      jsonSchema, new JsonSchemaToTableSchema()),
+                  createDisposition, bqServices));
         }
 
         ValueProvider<TableReference> table = getTableWithDefaultProject(options);
@@ -2608,16 +2617,19 @@ public class BigQueryIO {
    * Implementation of DoFn to perform streaming BigQuery write.
    */
   @SystemDoFnInternal
-  private static class StreamingWriteFn
+  @VisibleForTesting
+  static class StreamingWriteFn
       extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
     /** TableSchema in JSON. Use String to make the class Serializable. */
-    private final ValueProvider<String> jsonTableSchema;
+    @Nullable private final ValueProvider<String> jsonTableSchema;
 
     private final BigQueryServices bqServices;
 
     /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
     private transient Map<String, List<TableRow>> tableRows;
 
+    private final Write.CreateDisposition createDisposition;
+
     /** The list of unique ids for each BigQuery table row. */
     private transient Map<String, List<String>> uniqueIdsForTableRows;
 
@@ -2631,9 +2643,12 @@ public class BigQueryIO {
         createAggregator("ByteCount", Sum.ofLongs());
 
     /** Constructor. */
-    StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) {
-      this.jsonTableSchema =
+    StreamingWriteFn(@Nullable ValueProvider<TableSchema> schema,
+        Write.CreateDisposition createDisposition,
+        BigQueryServices bqServices) {
+      this.jsonTableSchema = schema == null ? null :
           NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
+      this.createDisposition = createDisposition;
       this.bqServices = checkNotNull(bqServices, "bqServices");
     }
 
@@ -2689,7 +2704,8 @@ public class BigQueryIO {
     public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
         throws InterruptedException, IOException {
       TableReference tableReference = parseTableSpec(tableSpec);
-      if (!createdTables.contains(tableSpec)) {
+      if (createDisposition != createDisposition.CREATE_NEVER
+          && !createdTables.contains(tableSpec)) {
         synchronized (createdTables) {
           // Another thread may have succeeded in creating the table in the meanwhile, so
           // check again. This check isn't needed for correctness, but we add it to prevent
@@ -2945,19 +2961,22 @@ public class BigQueryIO {
   * it leverages BigQuery best effort de-dup mechanism.
    */
   private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
-    private final transient ValueProvider<TableReference> tableReference;
-    private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-    private final transient ValueProvider<TableSchema> tableSchema;
+    @Nullable private final transient ValueProvider<TableReference> tableReference;
+    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+    @Nullable private final transient ValueProvider<TableSchema> tableSchema;
+    private final Write.CreateDisposition createDisposition;
     private final BigQueryServices bqServices;
 
     /** Constructor. */
     StreamWithDeDup(ValueProvider<TableReference> tableReference,
-        SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-        ValueProvider<TableSchema> tableSchema,
+        @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+        @Nullable ValueProvider<TableSchema> tableSchema,
+        Write.CreateDisposition createDisposition,
         BigQueryServices bqServices) {
       this.tableReference = tableReference;
       this.tableRefFunction = tableRefFunction;
       this.tableSchema = tableSchema;
+      this.createDisposition = createDisposition;
       this.bqServices = checkNotNull(bqServices, "bqServices");
     }
 
@@ -2989,7 +3008,7 @@ public class BigQueryIO {
       tagged
           .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))
           .apply(Reshuffle.<ShardedKey<String>, TableRowInfo>of())
-          .apply(ParDo.of(new StreamingWriteFn(tableSchema, bqServices)));
+          .apply(ParDo.of(new StreamingWriteFn(tableSchema, createDisposition, bqServices)));
 
       // Note that the implementation to return PDone here breaks the
       // implicit assumption about the job execution order. If a user

http://git-wip-us.apache.org/repos/asf/beam/blob/dc369522/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 3e8c2c9..ba7f44e 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -1523,6 +1523,42 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
+  public void testStreamingWriteFnCreateNever() throws Exception {
+    BigQueryIO.StreamingWriteFn fn = new BigQueryIO.StreamingWriteFn(
+        null, CreateDisposition.CREATE_NEVER, new FakeBigQueryServices());
+    assertEquals(BigQueryIO.parseTableSpec("dataset.table"),
+        fn.getOrCreateTable(null, "dataset.table"));
+  }
+
+  @Test
+  public void testCreateNeverWithStreaming() throws Exception {
+    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    options.setProject("project");
+    options.setStreaming(true);
+    Pipeline p = TestPipeline.create(options);
+
+    TableReference tableRef = new TableReference();
+    tableRef.setDatasetId("dataset");
+    tableRef.setTableId("sometable");
+
+    PCollection<TableRow> tableRows =
+        p.apply(CountingInput.unbounded())
+        .apply(
+            MapElements.via(
+                new SimpleFunction<Long, TableRow>() {
+                  @Override
+                  public TableRow apply(Long input) {
+                    return null;
+                  }
+                }))
+        .setCoder(TableRowJsonCoder.of());
+    tableRows
+        .apply(BigQueryIO.Write.to(tableRef)
+            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+            .withoutValidation());
+  }
+
+  @Test
   public void testTableParsing() {
     TableReference ref = BigQueryIO
         .parseTableSpec("my-project:data_set.table_name");