You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/12/12 19:14:48 UTC

[1/2] incubator-beam git commit: BigQueryIO.Write: support runtime schema and table

Repository: incubator-beam
Updated Branches:
  refs/heads/master 437393712 -> 321547fb1


BigQueryIO.Write: support runtime schema and table


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fd6d09c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fd6d09c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fd6d09c3

Branch: refs/heads/master
Commit: fd6d09c32f6bcf67c63ec74548373ee90d67f2bd
Parents: 4373937
Author: Sam McVeety <sg...@google.com>
Authored: Sun Dec 4 14:16:23 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Dec 12 11:14:20 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 217 +++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  60 ++++-
 2 files changed, 206 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd6d09c3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f99ca78..0be8567 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -321,6 +321,23 @@ public class BigQueryIO {
     return sb.toString();
   }
 
+  @VisibleForTesting
+  static class JsonSchemaToTableSchema
+      implements SerializableFunction<String, TableSchema> {
+    @Override
+    public TableSchema apply(String from) {
+      return fromJsonString(from, TableSchema.class);
+    }
+  }
+
+  private static class TableSchemaToJsonSchema
+      implements SerializableFunction<TableSchema, String> {
+    @Override
+    public String apply(TableSchema from) {
+      return toJsonString(from);
+    }
+  }
+
   private static class JsonTableRefToTableRef
       implements SerializableFunction<String, TableReference> {
     @Override
@@ -329,6 +346,14 @@ public class BigQueryIO {
     }
   }
 
+  private static class TableRefToTableSpec
+      implements SerializableFunction<TableReference, String> {
+    @Override
+    public String apply(TableReference from) {
+      return toTableSpec(from);
+    }
+  }
+
   private static class TableRefToJson
       implements SerializableFunction<TableReference, String> {
     @Override
@@ -353,6 +378,15 @@ public class BigQueryIO {
     }
   }
 
+  @Nullable
+  private static ValueProvider<String> displayTable(
+      @Nullable ValueProvider<TableReference> table) {
+    if (table == null) {
+      return null;
+    }
+    return NestedValueProvider.of(table, new TableRefToTableSpec());
+  }
+
   /**
    * A {@link PTransform} that reads from a BigQuery table and returns a
    * {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table.
@@ -659,11 +693,11 @@ public class BigQueryIO {
               .setProjectId(executingProject)
               .setDatasetId(queryTempDatasetId)
               .setTableId(queryTempTableId);
+          String jsonTableRef = toJsonString(queryTempTableRef);
 
           source = BigQueryQuerySource.create(
               jobIdToken, query, NestedValueProvider.of(
-                  StaticValueProvider.of(
-                      toJsonString(queryTempTableRef)), new JsonTableRefToTableRef()),
+                  StaticValueProvider.of(jsonTableRef), new JsonTableRefToTableRef()),
               flattenResults, useLegacySql, extractDestinationDir, bqServices);
         } else {
           ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
@@ -712,17 +746,10 @@ public class BigQueryIO {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
-        TableReference table = getTable();
-
-        if (table != null) {
-          builder.add(DisplayData.item("table", toTableSpec(table))
-            .withLabel("Table"));
-        }
-        String queryString = query == null
-            ? null : query.isAccessible()
-            ? query.get() : query.toString();
         builder
-            .addIfNotNull(DisplayData.item("query", queryString)
+            .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
+              .withLabel("Table"))
+            .addIfNotNull(DisplayData.item("query", query)
               .withLabel("Query"))
             .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
               .withLabel("Flatten Query Results"))
@@ -752,10 +779,10 @@ public class BigQueryIO {
         if (Strings.isNullOrEmpty(table.get().getProjectId())) {
           // If user does not specify a project we assume the table to be located in
           // the default project.
-          TableReference ref = table.get();
-          ref.setProjectId(bqOptions.getProject());
+          TableReference tableRef = table.get();
+          tableRef.setProjectId(bqOptions.getProject());
           return NestedValueProvider.of(StaticValueProvider.of(
-              toJsonString(ref)), new JsonTableRefToTableRef());
+              toJsonString(tableRef)), new JsonTableRefToTableRef());
         }
         return table;
       }
@@ -941,8 +968,7 @@ public class BigQueryIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      String table = jsonTable.isAccessible() ? jsonTable.get() : jsonTable.toString();
-      builder.add(DisplayData.item("table", table));
+      builder.add(DisplayData.item("table", jsonTable));
     }
   }
 
@@ -1060,7 +1086,7 @@ public class BigQueryIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("query", query.get()));
+      builder.add(DisplayData.item("query", query));
     }
 
     private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
@@ -1516,6 +1542,11 @@ public class BigQueryIO {
     }
 
     /** Creates a write transformation for the given table. */
+    public static Bound to(ValueProvider<String> tableSpec) {
+      return new Bound().to(tableSpec);
+    }
+
+    /** Creates a write transformation for the given table. */
     public static Bound to(TableReference table) {
       return new Bound().to(table);
     }
@@ -1558,6 +1589,13 @@ public class BigQueryIO {
       return new Bound().withSchema(schema);
     }
 
+    /**
+     * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+     */
+    public static Bound withSchema(ValueProvider<TableSchema> schema) {
+      return new Bound().withSchema(schema);
+    }
+
     /** Creates a write transformation with the specified options for creating the table. */
     public static Bound withCreateDisposition(CreateDisposition disposition) {
       return new Bound().withCreateDisposition(disposition);
@@ -1593,12 +1631,12 @@ public class BigQueryIO {
       // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
       static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
-      @Nullable final String jsonTableRef;
+      @Nullable final ValueProvider<String> jsonTableRef;
 
       @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
 
       // Table schema. The schema is required only if the table does not exist.
-      @Nullable final String jsonSchema;
+      @Nullable final ValueProvider<String> jsonSchema;
 
       // Options for creating the table. Valid values are CREATE_IF_NEEDED and
       // CREATE_NEVER.
@@ -1645,9 +1683,9 @@ public class BigQueryIO {
             null /* bigQueryServices */);
       }
 
-      private Bound(String name, @Nullable String jsonTableRef,
+      private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
           @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-          @Nullable String jsonSchema,
+          @Nullable ValueProvider<String> jsonSchema,
           CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate,
           @Nullable BigQueryServices bigQueryServices) {
         super(name);
@@ -1667,7 +1705,8 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound to(String tableSpec) {
-        return to(parseTableSpec(tableSpec));
+        return toTableRef(NestedValueProvider.of(
+            StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
       }
 
       /**
@@ -1676,7 +1715,28 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound to(TableReference table) {
-        return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
+        return to(StaticValueProvider.of(toTableSpec(table)));
+      }
+
+      /**
+       * Returns a copy of this write transformation, but writing to the specified table. Refer to
+       * {@link #parseTableSpec(String)} for the specification format.
+       *
+       * <p>Does not modify this object.
+       */
+      public Bound to(ValueProvider<String> tableSpec) {
+        return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
+      }
+
+      /**
+       * Returns a copy of this write transformation, but writing to the specified table.
+       *
+       * <p>Does not modify this object.
+       */
+      private Bound toTableRef(ValueProvider<TableReference> table) {
+        return new Bound(name,
+            NestedValueProvider.of(table, new TableRefToJson()),
+            tableRefFunction, jsonSchema, createDisposition,
             writeDisposition, validate, bigQueryServices);
       }
 
@@ -1716,7 +1776,17 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound withSchema(TableSchema schema) {
-        return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
+        return new Bound(name, jsonTableRef, tableRefFunction,
+            StaticValueProvider.of(toJsonString(schema)),
+            createDisposition, writeDisposition, validate, bigQueryServices);
+      }
+
+      /**
+       * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+       */
+      public Bound withSchema(ValueProvider<TableSchema> schema) {
+        return new Bound(name, jsonTableRef, tableRefFunction,
+            NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
             createDisposition, writeDisposition, validate, bigQueryServices);
       }
 
@@ -1798,7 +1868,7 @@ public class BigQueryIO {
 
         // The user specified a table.
         if (jsonTableRef != null && validate) {
-          TableReference table = getTableWithDefaultProject(options);
+          TableReference table = getTableWithDefaultProject(options).get();
 
           DatasetService datasetService = getBigQueryServices().getDatasetService(options);
           // Check for destination table presence and emptiness for early failure notification.
@@ -1855,10 +1925,11 @@ public class BigQueryIO {
         // StreamWithDeDup and BigQuery's streaming import API.
         if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
           return input.apply(
-              new StreamWithDeDup(getTable(), tableRefFunction, getSchema(), bqServices));
+              new StreamWithDeDup(getTable(), tableRefFunction,
+                  NestedValueProvider.of(jsonSchema, new JsonSchemaToTableSchema()), bqServices));
         }
 
-        TableReference table = getTableWithDefaultProject(options);
+        ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
         String jobIdToken = "beam_job_" + randomUUIDString();
         String tempLocation = options.getTempLocation();
@@ -1909,7 +1980,7 @@ public class BigQueryIO {
                 bqServices,
                 jobIdToken,
                 tempFilePrefix,
-                toJsonString(table),
+                NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 WriteDisposition.WRITE_EMPTY,
                 CreateDisposition.CREATE_IF_NEEDED)));
@@ -1920,7 +1991,7 @@ public class BigQueryIO {
             .of(new WriteRename(
                 bqServices,
                 jobIdToken,
-                toJsonString(table),
+                NestedValueProvider.of(table, new TableRefToJson()),
                 writeDisposition,
                 createDisposition,
                 tempTablesView))
@@ -1934,7 +2005,7 @@ public class BigQueryIO {
                 bqServices,
                 jobIdToken,
                 tempFilePrefix,
-                toJsonString(table),
+                NestedValueProvider.of(table, new TableRefToJson()),
                 jsonSchema,
                 writeDisposition,
                 createDisposition)));
@@ -2031,7 +2102,8 @@ public class BigQueryIO {
 
       /** Returns the table schema. */
       public TableSchema getSchema() {
-        return fromJsonString(jsonSchema, TableSchema.class);
+        return fromJsonString(
+            jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
       }
 
       /**
@@ -2039,20 +2111,32 @@ public class BigQueryIO {
        *
        * <p>If the table's project is not specified, use the executing project.
        */
-      @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
-        TableReference table = getTable();
-        if (table != null && Strings.isNullOrEmpty(table.getProjectId())) {
+      @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+          BigQueryOptions bqOptions) {
+        ValueProvider<TableReference> table = getTable();
+        if (table == null) {
+          return table;
+        }
+        if (!table.isAccessible()) {
+          LOG.info("Using a dynamic value for table input. This must contain a project"
+              + " in the table reference: {}", table);
+          return table;
+        }
+        if (Strings.isNullOrEmpty(table.get().getProjectId())) {
           // If user does not specify a project we assume the table to be located in
           // the default project.
-          table.setProjectId(bqOptions.getProject());
+          TableReference tableRef = table.get();
+          tableRef.setProjectId(bqOptions.getProject());
+          return NestedValueProvider.of(StaticValueProvider.of(
+              toJsonString(tableRef)), new JsonTableRefToTableRef());
         }
         return table;
       }
 
       /** Returns the table reference, or {@code null}. */
       @Nullable
-      public TableReference getTable() {
-        return fromJsonString(jsonTableRef, TableReference.class);
+      public ValueProvider<TableReference> getTable() {
+        return NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
       }
 
       /** Returns {@code true} if table validation is enabled. */
@@ -2172,8 +2256,8 @@ public class BigQueryIO {
       private final BigQueryServices bqServices;
       private final String jobIdToken;
       private final String tempFilePrefix;
-      private final String jsonTableRef;
-      private final String jsonSchema;
+      private final ValueProvider<String> jsonTableRef;
+      private final ValueProvider<String> jsonSchema;
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
 
@@ -2182,8 +2266,8 @@ public class BigQueryIO {
           BigQueryServices bqServices,
           String jobIdToken,
           String tempFilePrefix,
-          String jsonTableRef,
-          String jsonSchema,
+          ValueProvider<String> jsonTableRef,
+          ValueProvider<String> jsonSchema,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition) {
         this.singlePartition = singlePartition;
@@ -2200,7 +2284,7 @@ public class BigQueryIO {
       public void processElement(ProcessContext c) throws Exception {
         List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
         String jobIdPrefix = String.format(jobIdToken + "_%05d", c.element().getKey());
-        TableReference ref = fromJsonString(jsonTableRef, TableReference.class);
+        TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class);
         if (!singlePartition) {
           ref.setTableId(jobIdPrefix);
         }
@@ -2209,7 +2293,8 @@ public class BigQueryIO {
             bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
             jobIdPrefix,
             ref,
-            fromJsonString(jsonSchema, TableSchema.class),
+            fromJsonString(
+                jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
             partition,
             writeDisposition,
             createDisposition);
@@ -2242,16 +2327,15 @@ public class BigQueryIO {
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startLoadJob(jobRef, loadConfig);
-          Job job = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
-          Status jobStatus = parseStatus(job);
+          Status jobStatus =
+              parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES));
           switch (jobStatus) {
             case SUCCEEDED:
               return;
             case UNKNOWN:
               throw new RuntimeException("Failed to poll the load job status of job " + jobId);
             case FAILED:
-              LOG.info("BigQuery load job failed. Status: {} Details: {}",
-                  jobId, job.getStatus());
+              LOG.info("BigQuery load job failed: {}", jobId);
               continue;
             default:
               throw new IllegalStateException(String.format("Unexpected job status: %s of job %s",
@@ -2306,7 +2390,7 @@ public class BigQueryIO {
     static class WriteRename extends DoFn<String, Void> {
       private final BigQueryServices bqServices;
       private final String jobIdToken;
-      private final String jsonTableRef;
+      private final ValueProvider<String> jsonTableRef;
       private final WriteDisposition writeDisposition;
       private final CreateDisposition createDisposition;
       private final PCollectionView<Iterable<String>> tempTablesView;
@@ -2314,7 +2398,7 @@ public class BigQueryIO {
       public WriteRename(
           BigQueryServices bqServices,
           String jobIdToken,
-          String jsonTableRef,
+          ValueProvider<String> jsonTableRef,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition,
           PCollectionView<Iterable<String>> tempTablesView) {
@@ -2342,7 +2426,7 @@ public class BigQueryIO {
         copy(
             bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
             jobIdToken,
-            fromJsonString(jsonTableRef, TableReference.class),
+            fromJsonString(jsonTableRef.get(), TableReference.class),
             tempTables,
             writeDisposition,
             createDisposition);
@@ -2475,7 +2559,7 @@ public class BigQueryIO {
   private static class StreamingWriteFn
       extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
     /** TableSchema in JSON. Use String to make the class Serializable. */
-    private final String jsonTableSchema;
+    private final ValueProvider<String> jsonTableSchema;
 
     private final BigQueryServices bqServices;
 
@@ -2495,8 +2579,9 @@ public class BigQueryIO {
         createAggregator("ByteCount", new Sum.SumLongFn());
 
     /** Constructor. */
-    StreamingWriteFn(TableSchema schema, BigQueryServices bqServices) {
-      this.jsonTableSchema = toJsonString(schema);
+    StreamingWriteFn(ValueProvider<TableSchema> schema, BigQueryServices bqServices) {
+      this.jsonTableSchema =
+          NestedValueProvider.of(schema, new TableSchemaToJsonSchema());
       this.bqServices = checkNotNull(bqServices, "bqServices");
     }
 
@@ -2549,7 +2634,8 @@ public class BigQueryIO {
           // check again. This check isn't needed for correctness, but we add it to prevent
           // every thread from attempting a create and overwhelming our BigQuery quota.
           if (!createdTables.contains(tableSpec)) {
-            TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
+            TableSchema tableSchema = JSON_FACTORY.fromString(
+                jsonTableSchema.get(), TableSchema.class);
             Bigquery client = Transport.newBigQueryClient(options).build();
             BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
             inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND,
@@ -2708,7 +2794,7 @@ public class BigQueryIO {
   private static class TagWithUniqueIdsAndTable
       extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>> {
     /** TableSpec to write to. */
-    private final String tableSpec;
+    private final ValueProvider<String> tableSpec;
 
     /** User function mapping windows to {@link TableReference} in JSON. */
     private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
@@ -2716,15 +2802,16 @@ public class BigQueryIO {
     private transient String randomUUID;
     private transient long sequenceNo = 0L;
 
-    TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table,
+    TagWithUniqueIdsAndTable(BigQueryOptions options,
+        ValueProvider<TableReference> table,
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
       checkArgument(table == null ^ tableRefFunction == null,
           "Exactly one of table or tableRefFunction should be set");
       if (table != null) {
-        if (table.getProjectId() == null) {
-          table.setProjectId(options.as(BigQueryOptions.class).getProject());
+        if (table.isAccessible() && table.get().getProjectId() == null) {
+          table.get().setProjectId(options.as(BigQueryOptions.class).getProject());
         }
-        this.tableSpec = toTableSpec(table);
+        this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
       } else {
         tableSpec = null;
       }
@@ -2763,7 +2850,7 @@ public class BigQueryIO {
 
     private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
       if (tableSpec != null) {
-        return tableSpec;
+        return tableSpec.get();
       } else {
         TableReference table = tableRefFunction.apply(window);
         if (table.getProjectId() == null) {
@@ -2781,15 +2868,15 @@ public class BigQueryIO {
   * it leverages BigQuery best effort de-dup mechanism.
    */
   private static class StreamWithDeDup extends PTransform<PCollection<TableRow>, PDone> {
-    private final transient TableReference tableReference;
+    private final transient ValueProvider<TableReference> tableReference;
     private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-    private final transient TableSchema tableSchema;
+    private final transient ValueProvider<TableSchema> tableSchema;
     private final BigQueryServices bqServices;
 
     /** Constructor. */
-    StreamWithDeDup(TableReference tableReference,
+    StreamWithDeDup(ValueProvider<TableReference> tableReference,
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-        TableSchema tableSchema,
+        ValueProvider<TableSchema> tableSchema,
         BigQueryServices bqServices) {
       this.tableReference = tableReference;
       this.tableRefFunction = tableRefFunction;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fd6d09c3/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 25caf63..54ec2bb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -95,6 +95,7 @@ import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.JsonSchemaToTableSchema;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
@@ -111,6 +112,8 @@ import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.NeedsRunner;
@@ -643,9 +646,9 @@ public class BigQueryIOTest implements Serializable {
       BigQueryIO.Write.Bound bound, String project, String dataset, String table,
       TableSchema schema, CreateDisposition createDisposition,
       WriteDisposition writeDisposition, boolean validate) {
-    assertEquals(project, bound.getTable().getProjectId());
-    assertEquals(dataset, bound.getTable().getDatasetId());
-    assertEquals(table, bound.getTable().getTableId());
+    assertEquals(project, bound.getTable().get().getProjectId());
+    assertEquals(dataset, bound.getTable().get().getDatasetId());
+    assertEquals(table, bound.getTable().get().getTableId());
     assertEquals(schema, bound.getSchema());
     assertEquals(createDisposition, bound.createDisposition);
     assertEquals(writeDisposition, bound.writeDisposition);
@@ -1845,8 +1848,8 @@ public class BigQueryIOTest implements Serializable {
         fakeBqServices,
         jobIdToken,
         tempFilePrefix,
-        jsonTable,
-        jsonSchema,
+        StaticValueProvider.of(jsonTable),
+        StaticValueProvider.of(jsonSchema),
         WriteDisposition.WRITE_EMPTY,
         CreateDisposition.CREATE_IF_NEEDED);
 
@@ -1920,7 +1923,7 @@ public class BigQueryIOTest implements Serializable {
     WriteRename writeRename = new WriteRename(
         fakeBqServices,
         jobIdToken,
-        jsonTable,
+        StaticValueProvider.of(jsonTable),
         WriteDisposition.WRITE_EMPTY,
         CreateDisposition.CREATE_IF_NEEDED,
         tempTablesView);
@@ -1961,6 +1964,51 @@ public class BigQueryIOTest implements Serializable {
     logged.verifyNotLogged("Failed to delete the table " + toJsonString(tableRefs.get(2)));
   }
 
+  /** Test options. **/
+  public interface RuntimeTestOptions extends PipelineOptions {
+    ValueProvider<String> getInputTable();
+    void setInputTable(ValueProvider<String> value);
+
+    ValueProvider<String> getInputQuery();
+    void setInputQuery(ValueProvider<String> value);
+
+    ValueProvider<String> getOutputTable();
+    void setOutputTable(ValueProvider<String> value);
+
+    ValueProvider<String> getOutputSchema();
+    void setOutputSchema(ValueProvider<String> value);
+  }
+
+  @Test
+  public void testRuntimeOptionsNotCalledInApplyInputTable() {
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+    Pipeline pipeline = TestPipeline.create(options);
+    pipeline
+        .apply(BigQueryIO.Read.from(options.getInputTable()).withoutValidation())
+        .apply(BigQueryIO.Write
+            .to(options.getOutputTable())
+            .withSchema(NestedValueProvider.of(
+                options.getOutputSchema(), new JsonSchemaToTableSchema()))
+            .withoutValidation());
+  }
+
+  @Test
+  public void testRuntimeOptionsNotCalledInApplyInputQuery() {
+    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    bqOptions.setTempLocation("gs://testbucket/testdir");
+    Pipeline pipeline = TestPipeline.create(options);
+    pipeline
+        .apply(BigQueryIO.Read.fromQuery(options.getInputQuery()).withoutValidation())
+        .apply(BigQueryIO.Write
+            .to(options.getOutputTable())
+            .withSchema(NestedValueProvider.of(
+                options.getOutputSchema(), new JsonSchemaToTableSchema()))
+            .withoutValidation());
+  }
+
   private static void testNumFiles(File tempDir, int expectedNumFiles) {
     assertEquals(expectedNumFiles, tempDir.listFiles(new FileFilter() {
       @Override


[2/2] incubator-beam git commit: Closes #1513

Posted by dh...@apache.org.
Closes #1513


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/321547fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/321547fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/321547fb

Branch: refs/heads/master
Commit: 321547fb15c358fcd196954779548f6644aa3c08
Parents: 4373937 fd6d09c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Dec 12 11:14:41 2016 -0800
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Dec 12 11:14:41 2016 -0800

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 217 +++++++++++++------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  60 ++++-
 2 files changed, 206 insertions(+), 71 deletions(-)
----------------------------------------------------------------------