You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:45 UTC

[11/50] [abbrv] beam git commit: Use tableRefFunction throughout BigQueryIO. Constant table writes use ConstantTableSpecFunction.

Use tableRefFunction throughout BigQueryIO. Constant table writes use ConstantTableSpecFunction.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c939a436
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c939a436
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c939a436

Branch: refs/heads/DSL_SQL
Commit: c939a43617cdb37228625a34b3545377b142fc8a
Parents: e0df7d8
Author: Reuven Lax <re...@google.com>
Authored: Tue Mar 28 11:21:59 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 21:12:49 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 57 ++++++++++----------
 .../sdk/io/gcp/bigquery/StreamWithDeDup.java    |  4 +-
 .../gcp/bigquery/TagWithUniqueIdsAndTable.java  | 57 ++++++--------------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 19 ++-----
 4 files changed, 50 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 9753da5..af0d561 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -700,7 +700,8 @@ public class BigQueryIO {
       abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
       abstract Builder<T> setTableRefFunction(
           SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction);
-      abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
+      abstract Builder<T> setFormatFunction(
+          SerializableFunction<T, TableRow> formatFunction);
       abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
       abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
       abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
@@ -781,7 +782,8 @@ public class BigQueryIO {
     /** Ensures that methods of the to() family are called at most once. */
     private void ensureToNotCalledYet() {
       checkState(
-          getJsonTableRef() == null && getTable() == null, "to() already called");
+          getJsonTableRef() == null && getTable() == null
+              && getTableRefFunction() == null, "to() already called");
     }
 
     /**
@@ -805,6 +807,8 @@ public class BigQueryIO {
               NestedValueProvider.of(
                   NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
                   new TableRefToJson()))
+          .setTableRefFunction(new TranslateTableSpecFunction<T>(
+              new ConstantTableSpecFunction<T>(tableSpec)))
           .build();
     }
 
@@ -812,7 +816,8 @@ public class BigQueryIO {
      * Writes to table specified by the specified table function. The table is a function of
      * {@link ValueInSingleWindow}, so can be determined by the value or by the window.
      */
-    public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
+    public Write<T> to(
+        SerializableFunction<ValueInSingleWindow<T>, String> tableSpecFunction) {
       return toTableReference(new TranslateTableSpecFunction<T>(tableSpecFunction));
     }
 
@@ -848,6 +853,20 @@ public class BigQueryIO {
       }
     }
 
+    static class ConstantTableSpecFunction<T> implements
+        SerializableFunction<ValueInSingleWindow<T>, String> {
+      private ValueProvider<String> tableSpec;
+
+      ConstantTableSpecFunction(ValueProvider<String> tableSpec) {
+        this.tableSpec = tableSpec;
+      }
+
+      @Override
+      public String apply(ValueInSingleWindow<T> value) {
+        return tableSpec.get();
+      }
+    }
+
     /**
      * Uses the specified schema for rows to be written.
      *
@@ -900,13 +919,8 @@ public class BigQueryIO {
       BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
       // Exactly one of the table and table reference can be configured.
-      checkState(
-          getJsonTableRef() != null || getTableRefFunction() != null,
+      checkState(getTableRefFunction() != null,
           "must set the table reference of a BigQueryIO.Write transform");
-      checkState(
-          getJsonTableRef() == null || getTableRefFunction() == null,
-          "Cannot set both a table reference and a table function for a BigQueryIO.Write"
-              + " transform");
 
       checkArgument(getFormatFunction() != null,
                     "A function must be provided to convert type into a TableRow. "
@@ -920,6 +934,7 @@ public class BigQueryIO {
       // The user specified a table.
       if (getJsonTableRef() != null && getValidate()) {
         TableReference table = getTableWithDefaultProject(options).get();
+        // TODO: This seems wrong - what if the ValueProvider is not accessible?
 
         DatasetService datasetService = getBigQueryServices().getDatasetService(options);
         // Check for destination table presence and emptiness for early failure notification.
@@ -935,24 +950,12 @@ public class BigQueryIO {
         }
       }
 
-      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || getTableRefFunction() != null) {
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED) {
         // We will use BigQuery's streaming write API -- validate supported dispositions.
-        if (getTableRefFunction() != null) {
-          checkArgument(
-              getCreateDisposition() != CreateDisposition.CREATE_NEVER,
-              "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
-              + " function.");
-        }
-        if (getJsonSchema() == null) {
-          checkArgument(
-              getCreateDisposition() == CreateDisposition.CREATE_NEVER,
-              "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
-        }
-
         checkArgument(
             getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
-            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
-                + " when using a tablespec function.");
+            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded"
+            + " PCollection.");
       } else {
         // We will use a BigQuery load job -- validate the temp location.
         String tempLocation = options.getTempLocation();
@@ -977,7 +980,7 @@ public class BigQueryIO {
     public WriteResult expand(PCollection<T> input) {
       // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
       // StreamWithDeDup and BigQuery's streaming import API.
-      if (input.isBounded() == IsBounded.UNBOUNDED || getTableRefFunction() != null) {
+      if (input.isBounded() == IsBounded.UNBOUNDED) {
         return input.apply(new StreamWithDeDup<T>(this));
       } else {
         return input.apply(new BatchLoadBigQuery<T>(this));
@@ -1026,12 +1029,12 @@ public class BigQueryIO {
      *
      * <p>If the table's project is not specified, use the executing project.
      */
-    @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
-        BigQueryOptions bqOptions) {
+    @Nullable ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
       ValueProvider<TableReference> table = getTable();
       if (table == null) {
         return table;
       }
+
       if (!table.isAccessible()) {
         LOG.info("Using a dynamic value for table input. This must contain a project"
             + " in the table reference: {}", table);

http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
index 1fa26d1..506a564 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamWithDeDup.java
@@ -64,8 +64,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
 
     PCollection<KV<ShardedKey<String>, TableRowInfo>> tagged =
         input.apply(ParDo.of(new TagWithUniqueIdsAndTable<T>(
-            input.getPipeline().getOptions().as(BigQueryOptions.class), write.getTable(),
-            write.getTableRefFunction(), write.getFormatFunction())));
+            input.getPipeline().getOptions().as(BigQueryOptions.class), write)));
 
     // To prevent having the same TableRow processed more than once with regenerated
     // different unique ids, this implementation relies on "checkpointing", which is
@@ -85,6 +84,7 @@ class StreamWithDeDup<T> extends PTransform<PCollection<T>, WriteResult> {
                     write.getCreateDisposition(),
                     write.getTableDescription(),
                     write.getBigQueryServices())));
+
     return WriteResult.in(input.getPipeline());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
index a6608e4..8d7d1e6 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIdsAndTable.java
@@ -18,23 +18,18 @@
 
 package org.apache.beam.sdk.io.gcp.bigquery;
 
-import static com.google.common.base.Preconditions.checkArgument;
-
 import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
@@ -49,39 +44,22 @@ import org.apache.beam.sdk.values.ValueInSingleWindow;
 @VisibleForTesting
 class TagWithUniqueIdsAndTable<T>
     extends DoFn<T, KV<ShardedKey<String>, TableRowInfo>> {
-  /** TableSpec to write to. */
-  private final ValueProvider<String> tableSpec;
-
-  /** User function mapping windowed values to {@link TableReference} in JSON. */
-  private final SerializableFunction<ValueInSingleWindow<T>, TableReference> tableRefFunction;
+  /** TableSpec to write to in the case of a single static destination. */
+  private ValueProvider<String> tableSpec = null;
 
-  /** User function mapping user type to a TableRow. */
-  private final SerializableFunction<T, TableRow> formatFunction;
+  private final Write<T, ?> write;
 
   private transient String randomUUID;
   private transient long sequenceNo = 0L;
 
   TagWithUniqueIdsAndTable(BigQueryOptions options,
-                           ValueProvider<TableReference> table,
-                           SerializableFunction<ValueInSingleWindow<T>, TableReference>
-                               tableRefFunction,
-                           SerializableFunction<T, TableRow> formatFunction) {
-    checkArgument(table == null ^ tableRefFunction == null,
-        "Exactly one of table or tableRefFunction should be set");
+                           Write<T, ?> write) {
+    ValueProvider<TableReference> table = write.getTableWithDefaultProject(
+        options.as(BigQueryOptions.class));
     if (table != null) {
-      if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
-        TableReference tableRef = table.get()
-            .setProjectId(options.as(BigQueryOptions.class).getProject());
-        table = NestedValueProvider.of(
-            StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
-            new JsonTableRefToTableRef());
-      }
       this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
-    } else {
-      tableSpec = null;
     }
-    this.tableRefFunction = tableRefFunction;
-    this.formatFunction = formatFunction;
+    this.write = write;
   }
 
 
@@ -101,7 +79,7 @@ class TagWithUniqueIdsAndTable<T>
     // We output on keys 0-50 to ensure that there's enough batching for
     // BigQuery.
     context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)),
-        new TableRowInfo(formatFunction.apply(context.element()), uniqueId)));
+        new TableRowInfo(write.getFormatFunction().apply(context.element()), uniqueId)));
   }
 
   @Override
@@ -109,10 +87,8 @@ class TagWithUniqueIdsAndTable<T>
     super.populateDisplayData(builder);
 
     builder.addIfNotNull(DisplayData.item("table", tableSpec));
-    if (tableRefFunction != null) {
-      builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+    builder.add(DisplayData.item("tableFn", write.getTableRefFunction().getClass())
         .withLabel("Table Reference Function"));
-    }
   }
 
   @VisibleForTesting
@@ -120,16 +96,13 @@ class TagWithUniqueIdsAndTable<T>
     return tableSpec;
   }
 
+
   private String tableSpecFromWindowedValue(BigQueryOptions options,
                                             ValueInSingleWindow<T> value) {
-    if (tableSpec != null) {
-      return tableSpec.get();
-    } else {
-      TableReference table = tableRefFunction.apply(value);
-      if (table.getProjectId() == null) {
-        table.setProjectId(options.getProject());
-      }
-      return BigQueryHelpers.toTableSpec(table);
+    TableReference table = write.getTableRefFunction().apply(value);
+    if (Strings.isNullOrEmpty(table.getProjectId())) {
+      table.setProjectId(options.getProject());
     }
+    return BigQueryHelpers.toTableSpec(table);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c939a436/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 83fd8d9..499aa74 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -26,7 +26,6 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.hasItem;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
@@ -103,7 +102,6 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
-import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
@@ -150,6 +148,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
@@ -1375,7 +1374,8 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteDefaultProject() {
-    BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows().to("somedataset.sometable");
+    BigQueryIO.Write<TableRow> write = BigQueryIO.writeTableRows()
+        .to("somedataset" + ".sometable");
     checkWriteObject(
         write, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY,
@@ -2350,19 +2350,6 @@ public class BigQueryIOTest implements Serializable {
     DisplayData.from(write);
   }
 
-  @Test
-  public void testTagWithUniqueIdsAndTableProjectNotNullWithNvp() {
-    BigQueryOptions bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class);
-    bqOptions.setProject("project");
-    TagWithUniqueIdsAndTable<TableRow> tag =
-        new TagWithUniqueIdsAndTable<TableRow>(
-            bqOptions, NestedValueProvider.of(
-                StaticValueProvider.of("data_set.table_name"),
-                new TableSpecToTableRef()), null, null);
-    TableReference table = BigQueryHelpers.parseTableSpec(tag.getTableSpec().get());
-    assertNotNull(table.getProjectId());
-  }
-
   private static void testNumFiles(File tempDir, int expectedNumFiles) {
     assertEquals(expectedNumFiles, tempDir.listFiles(new FileFilter() {
       @Override