You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/14 22:54:57 UTC

[03/10] beam git commit: Condense BigQueryIO.Write.Bound into BigQueryIO.Write

Condense BigQueryIO.Write.Bound into BigQueryIO.Write


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d1f4400
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d1f4400
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d1f4400

Branch: refs/heads/master
Commit: 7d1f4400ab844c7b4e636482891be55174390431
Parents: 825338a
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Mar 2 17:26:09 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Mar 14 15:54:24 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 1080 +++++++++---------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  116 +-
 2 files changed, 552 insertions(+), 644 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f6c8575..90d7f67 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -1415,7 +1415,43 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Write {
+  public static class Write extends PTransform<PCollection<TableRow>, PDone> {
+    // Maximum number of files in a single partition.
+    static final int MAX_NUM_FILES = 10000;
+
+    // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
+    static final long MAX_SIZE_BYTES = 11 * (1L << 40);
+
+    // The maximum number of retry jobs.
+    private static final int MAX_RETRY_JOBS = 3;
+
+    // The maximum number of retries to poll the status of a job.
+    // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+    private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+    @Nullable private final ValueProvider<String> jsonTableRef;
+
+    @Nullable private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+
+    // Table schema. The schema is required only if the table does not exist.
+    @Nullable private final ValueProvider<String> jsonSchema;
+
+    // Options for creating the table. Valid values are CREATE_IF_NEEDED and
+    // CREATE_NEVER.
+    final CreateDisposition createDisposition;
+
+    // Options for writing to the table. Valid values are WRITE_TRUNCATE,
+    // WRITE_APPEND and WRITE_EMPTY.
+    final WriteDisposition writeDisposition;
+
+    @Nullable
+    final String tableDescription;
+
+    // An option to indicate if table validation is desired. Default is true.
+    final boolean validate;
+
+    @Nullable private BigQueryServices bigQueryServices;
+
     /**
      * An enumeration type for the BigQuery create disposition strings.
      *
@@ -1488,18 +1524,18 @@ public class BigQueryIO {
      *
      * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
      */
-    public static Bound to(String tableSpec) {
-      return new Bound().to(tableSpec);
+    public static Write to(String tableSpec) {
+      return new Write().withTableSpec(tableSpec);
     }
 
     /** Creates a write transformation for the given table. */
-    public static Bound to(ValueProvider<String> tableSpec) {
-      return new Bound().to(tableSpec);
+    public static Write to(ValueProvider<String> tableSpec) {
+      return new Write().withTableSpec(tableSpec);
     }
 
     /** Creates a write transformation for the given table. */
-    public static Bound to(TableReference table) {
-      return new Bound().to(table);
+    public static Write to(TableReference table) {
+      return new Write().withTableRef(table);
     }
 
     /**
@@ -1513,8 +1549,8 @@ public class BigQueryIO {
      * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
      * always return the same table specification.
      */
-    public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return new Bound().to(tableSpecFunction);
+    public static Write to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+      return new Write().withTableSpec(tableSpecFunction);
     }
 
     /**
@@ -1524,634 +1560,547 @@ public class BigQueryIO {
      * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
      * always return the same table reference.
      */
-    public static Bound toTableReference(
+    private static Write toTableReference(
         SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Bound().toTableReference(tableRefFunction);
+      return new Write().withTableRef(tableRefFunction);
+    }
+
+    private static class TranslateTableSpecFunction implements
+        SerializableFunction<BoundedWindow, TableReference> {
+      private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+
+      TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+        this.tableSpecFunction = tableSpecFunction;
+      }
+
+      @Override
+      public TableReference apply(BoundedWindow value) {
+        return parseTableSpec(tableSpecFunction.apply(value));
+      }
+    }
+
+    private Write() {
+      this(
+          null /* name */,
+          null /* jsonTableRef */,
+          null /* tableRefFunction */,
+          null /* jsonSchema */,
+          CreateDisposition.CREATE_IF_NEEDED,
+          WriteDisposition.WRITE_EMPTY,
+          null /* tableDescription */,
+          true /* validate */,
+          null /* bigQueryServices */);
+    }
+
+    private Write(String name, @Nullable ValueProvider<String> jsonTableRef,
+        @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+        @Nullable ValueProvider<String> jsonSchema,
+        CreateDisposition createDisposition,
+        WriteDisposition writeDisposition,
+        @Nullable String tableDescription,
+        boolean validate,
+        @Nullable BigQueryServices bigQueryServices) {
+      super(name);
+      this.jsonTableRef = jsonTableRef;
+      this.tableRefFunction = tableRefFunction;
+      this.jsonSchema = jsonSchema;
+      this.createDisposition = checkNotNull(createDisposition, "createDisposition");
+      this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+      this.tableDescription = tableDescription;
+      this.validate = validate;
+      this.bigQueryServices = bigQueryServices;
     }
 
     /**
-     * Creates a write transformation with the specified schema to use in table creation.
+     * Returns a copy of this write transformation, but writing to the specified table. Refer to
+     * {@link #parseTableSpec(String)} for the specification format.
      *
-     * <p>The schema is <i>required</i> only if writing to a table that does not already
-     * exist, and {@link CreateDisposition} is set to
-     * {@link CreateDisposition#CREATE_IF_NEEDED}.
+     * <p>Does not modify this object.
      */
-    public static Bound withSchema(TableSchema schema) {
-      return new Bound().withSchema(schema);
+    private Write withTableSpec(String tableSpec) {
+      return withTableRef(NestedValueProvider.of(
+          StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
     }
 
     /**
-     * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+     * Returns a copy of this write transformation, but writing to the specified table.
+     *
+     * <p>Does not modify this object.
      */
-    public static Bound withSchema(ValueProvider<TableSchema> schema) {
-      return new Bound().withSchema(schema);
+    public Write withTableRef(TableReference table) {
+      return withTableSpec(StaticValueProvider.of(toTableSpec(table)));
     }
 
-    /** Creates a write transformation with the specified options for creating the table. */
-    public static Bound withCreateDisposition(CreateDisposition disposition) {
-      return new Bound().withCreateDisposition(disposition);
+    /**
+     * Returns a copy of this write transformation, but writing to the specified table. Refer to
+     * {@link #parseTableSpec(String)} for the specification format.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableSpec(ValueProvider<String> tableSpec) {
+      return withTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
     }
 
-    /** Creates a write transformation with the specified options for writing to the table. */
-    public static Bound withWriteDisposition(WriteDisposition disposition) {
-      return new Bound().withWriteDisposition(disposition);
+    /**
+     * Returns a copy of this write transformation, but writing to the specified table.
+     *
+     * <p>Does not modify this object.
+     */
+    private Write withTableRef(ValueProvider<TableReference> table) {
+      return new Write(name,
+          NestedValueProvider.of(table, new TableRefToJson()),
+          tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, validate, bigQueryServices);
     }
 
-    /** Creates a write transformation with the specified table description. */
-    public static Bound withTableDescription(@Nullable String tableDescription) {
-      return new Bound().withTableDescription(tableDescription);
+    /**
+     * Returns a copy of this write transformation, but using the specified function to determine
+     * which table to write to for each window.
+     *
+     * <p>Does not modify this object.
+     *
+     * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
+     * should always return the same table specification.
+     */
+    private Write withTableSpec(
+        SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+      return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
     }
 
     /**
-     * Creates a write transformation with BigQuery table validation disabled.
+     * Returns a copy of this write transformation, but using the specified function to determine
+     * which table to write to for each window.
+     *
+     * <p>Does not modify this object.
+     *
+     * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
+     * always return the same table reference.
      */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
+    private Write withTableRef(
+        SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, validate, bigQueryServices);
     }
 
     /**
-     * A {@link PTransform} that can write either a bounded or unbounded
-     * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
+     * Returns a copy of this write transformation, but using the specified schema for rows
+     * to be written.
+     *
+     * <p>Does not modify this object.
      */
-    public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
-      // Maximum number of files in a single partition.
-      static final int MAX_NUM_FILES = 10000;
-
-      // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
-      static final long MAX_SIZE_BYTES = 11 * (1L << 40);
-
-      // The maximum number of retry jobs.
-      static final int MAX_RETRY_JOBS = 3;
-
-      // The maximum number of retries to poll the status of a job.
-      // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-      static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
-      @Nullable final ValueProvider<String> jsonTableRef;
-
-      @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
-      // Table schema. The schema is required only if the table does not exist.
-      @Nullable final ValueProvider<String> jsonSchema;
+    public Write withSchema(TableSchema schema) {
+      return new Write(name, jsonTableRef, tableRefFunction,
+          StaticValueProvider.of(toJsonString(schema)),
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      // Options for creating the table. Valid values are CREATE_IF_NEEDED and
-      // CREATE_NEVER.
-      final CreateDisposition createDisposition;
+    /**
+     * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
+     */
+    public Write withSchema(ValueProvider<TableSchema> schema) {
+      return new Write(name, jsonTableRef, tableRefFunction,
+          NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      // Options for writing to the table. Valid values are WRITE_TRUNCATE,
-      // WRITE_APPEND and WRITE_EMPTY.
-      final WriteDisposition writeDisposition;
+    /**
+     * Returns a copy of this write transformation, but using the specified create disposition.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withCreateDisposition(CreateDisposition createDisposition) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      @Nullable final String tableDescription;
+    /**
+     * Returns a copy of this write transformation, but using the specified write disposition.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withWriteDisposition(WriteDisposition writeDisposition) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      // An option to indicate if table validation is desired. Default is true.
-      final boolean validate;
+    /**
+     * Returns a copy of this write transformation, but using the specified table description.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withTableDescription(@Nullable String tableDescription) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema,
+          createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+    }
 
-      @Nullable private BigQueryServices bigQueryServices;
+    /**
+     * Returns a copy of this write transformation, but without BigQuery table validation.
+     *
+     * <p>Does not modify this object.
+     */
+    public Write withoutValidation() {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, false, bigQueryServices);
+    }
 
-      private static class TranslateTableSpecFunction implements
-          SerializableFunction<BoundedWindow, TableReference> {
-        private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+    @VisibleForTesting
+    Write withTestServices(BigQueryServices testServices) {
+      return new Write(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+          writeDisposition, tableDescription, validate, testServices);
+    }
 
-        TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-          this.tableSpecFunction = tableSpecFunction;
+    private static void verifyTableNotExistOrEmpty(
+        DatasetService datasetService,
+        TableReference tableRef) {
+      try {
+        if (datasetService.getTable(tableRef) != null) {
+          checkState(
+              datasetService.isTableEmpty(tableRef),
+              "BigQuery table is not empty: %s.",
+              BigQueryIO.toTableSpec(tableRef));
         }
-
-        @Override
-        public TableReference apply(BoundedWindow value) {
-          return parseTableSpec(tableSpecFunction.apply(value));
+      } catch (IOException | InterruptedException e) {
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
         }
+        throw new RuntimeException(
+            "unable to confirm BigQuery table emptiness for table "
+                + BigQueryIO.toTableSpec(tableRef), e);
       }
+    }
 
-      /**
-       * @deprecated Should be private. Instead, use one of the factory methods in
-       * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an
-       * instance of this class.
-       */
-      @Deprecated
-      public Bound() {
-        this(
-            null /* name */,
-            null /* jsonTableRef */,
-            null /* tableRefFunction */,
-            null /* jsonSchema */,
-            CreateDisposition.CREATE_IF_NEEDED,
-            WriteDisposition.WRITE_EMPTY,
-            null /* tableDescription */,
-            true /* validate */,
-            null /* bigQueryServices */);
-      }
-
-      private Bound(String name, @Nullable ValueProvider<String> jsonTableRef,
-          @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-          @Nullable ValueProvider<String> jsonSchema,
-          CreateDisposition createDisposition,
-          WriteDisposition writeDisposition,
-          @Nullable String tableDescription,
-          boolean validate,
-          @Nullable BigQueryServices bigQueryServices) {
-        super(name);
-        this.jsonTableRef = jsonTableRef;
-        this.tableRefFunction = tableRefFunction;
-        this.jsonSchema = jsonSchema;
-        this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-        this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-        this.tableDescription = tableDescription;
-        this.validate = validate;
-        this.bigQueryServices = bigQueryServices;
-      }
-
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(String tableSpec) {
-        return toTableRef(NestedValueProvider.of(
-            StaticValueProvider.of(tableSpec), new TableSpecToTableRef()));
-      }
+    @Override
+    public void validate(PCollection<TableRow> input) {
+      BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
 
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(TableReference table) {
-        return to(StaticValueProvider.of(toTableSpec(table)));
-      }
+      // Exactly one of the table and table reference can be configured.
+      checkState(
+          jsonTableRef != null || tableRefFunction != null,
+          "must set the table reference of a BigQueryIO.Write transform");
+      checkState(
+          jsonTableRef == null || tableRefFunction == null,
+          "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+              + " transform");
 
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(ValueProvider<String> tableSpec) {
-        return toTableRef(NestedValueProvider.of(tableSpec, new TableSpecToTableRef()));
+      // Require a schema if creating one or more tables.
+      checkArgument(
+          createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+          "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+      // The user specified a table.
+      if (jsonTableRef != null && validate) {
+        TableReference table = getTableWithDefaultProject(options).get();
+
+        DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+        // Check for destination table presence and emptiness for early failure notification.
+        // Note that a presence check can fail when the table or dataset is created by an earlier
+        // stage of the pipeline. For these cases the #withoutValidation method can be used to
+        // disable the check.
+        verifyDatasetPresence(datasetService, table);
+        if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+          verifyTablePresence(datasetService, table);
+        }
+        if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+          verifyTableNotExistOrEmpty(datasetService, table);
+        }
       }
 
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      private Bound toTableRef(ValueProvider<TableReference> table) {
-        return new Bound(name,
-            NestedValueProvider.of(table, new TableRefToJson()),
-            tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+        // We will use BigQuery's streaming write API -- validate supported dispositions.
+        if (tableRefFunction != null) {
+          checkArgument(
+              createDisposition != CreateDisposition.CREATE_NEVER,
+              "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
+              + " function.");
+        }
+        if (jsonSchema == null) {
+          checkArgument(
+              createDisposition == CreateDisposition.CREATE_NEVER,
+              "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
+        }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified function to determine
-       * which table to write to for each window.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
-       * should always return the same table specification.
-       */
-      public Bound to(
-          SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-        return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
+        checkArgument(
+            writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+            "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+                + " when using a tablespec function.");
+      } else {
+        // We will use a BigQuery load job -- validate the temp location.
+        String tempLocation = options.getTempLocation();
+        checkArgument(
+            !Strings.isNullOrEmpty(tempLocation),
+            "BigQueryIO.Write needs a GCS temp location to store temp files.");
+        if (bigQueryServices == null) {
+          try {
+            GcsPath.fromUri(tempLocation);
+          } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+                    tempLocation),
+                e);
+          }
+        }
       }
+    }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified function to determine
-       * which table to write to for each window.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
-       * always return the same table reference.
-       */
-      public Bound toTableReference(
-          SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, validate, bigQueryServices);
+    @Override
+    public PDone expand(PCollection<TableRow> input) {
+      Pipeline p = input.getPipeline();
+      BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
+      BigQueryServices bqServices = getBigQueryServices();
+
+      // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
+      // StreamWithDeDup and BigQuery's streaming import API.
+      if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
+        return input.apply(
+            new StreamWithDeDup(getTable(), tableRefFunction,
+                jsonSchema == null ? null : NestedValueProvider.of(
+                    jsonSchema, new JsonSchemaToTableSchema()),
+                createDisposition,
+                tableDescription,
+                bqServices));
       }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified schema for rows
-       * to be written.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withSchema(TableSchema schema) {
-        return new Bound(name, jsonTableRef, tableRefFunction,
-            StaticValueProvider.of(toJsonString(schema)),
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
-      /**
-       * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}.
-       */
-      public Bound withSchema(ValueProvider<TableSchema> schema) {
-        return new Bound(name, jsonTableRef, tableRefFunction,
-            NestedValueProvider.of(schema, new TableSchemaToJsonSchema()),
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      String stepUuid = randomUUIDString();
 
-      /**
-       * Returns a copy of this write transformation, but using the specified create disposition.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withCreateDisposition(CreateDisposition createDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
+      String tempLocation = options.getTempLocation();
+      String tempFilePrefix;
+      try {
+        IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+        tempFilePrefix = factory.resolve(
+                factory.resolve(tempLocation, "BigQueryWriteTemp"),
+                stepUuid);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+            e);
       }
 
-      /**
-       * Returns a copy of this write transformation, but using the specified write disposition.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withWriteDisposition(WriteDisposition writeDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      // Create a singleton job ID token at execution time.
+      PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
+      PCollectionView<String> jobIdTokenView = p
+          .apply("TriggerIdCreation", Create.of("ignored"))
+          .apply("CreateJobId", MapElements.via(
+              new SimpleFunction<String, String>() {
+                @Override
+                public String apply(String input) {
+                  return randomUUIDString();
+                }
+              }))
+          .apply(View.<String>asSingleton());
+
+      PCollection<TableRow> inputInGlobalWindow =
+          input.apply(
+              Window.<TableRow>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+
+      PCollection<KV<String, Long>> results = inputInGlobalWindow
+          .apply("WriteBundles",
+              ParDo.of(new WriteBundles(tempFilePrefix)));
+
+      TupleTag<KV<Long, List<String>>> multiPartitionsTag =
+          new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
+      TupleTag<KV<Long, List<String>>> singlePartitionTag =
+          new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
+
+      PCollectionView<Iterable<KV<String, Long>>> resultsView = results
+          .apply("ResultsView", View.<KV<String, Long>>asIterable());
+      PCollectionTuple partitions = singleton.apply(ParDo
+          .of(new WritePartition(
+              resultsView,
+              multiPartitionsTag,
+              singlePartitionTag))
+          .withSideInputs(resultsView)
+          .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
+
+      // Write multiple partitions to separate temporary tables
+      PCollection<String> tempTables = partitions.get(multiPartitionsTag)
+          .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
+          .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
+              false,
+              bqServices,
+              jobIdTokenView,
+              tempFilePrefix,
+              NestedValueProvider.of(table, new TableRefToJson()),
+              jsonSchema,
+              WriteDisposition.WRITE_EMPTY,
+              CreateDisposition.CREATE_IF_NEEDED,
+              tableDescription))
+          .withSideInputs(jobIdTokenView));
+
+      PCollectionView<Iterable<String>> tempTablesView = tempTables
+          .apply("TempTablesView", View.<String>asIterable());
+      singleton.apply(ParDo
+          .of(new WriteRename(
+              bqServices,
+              jobIdTokenView,
+              NestedValueProvider.of(table, new TableRefToJson()),
+              writeDisposition,
+              createDisposition,
+              tempTablesView,
+              tableDescription))
+          .withSideInputs(tempTablesView, jobIdTokenView));
+
+      // Write single partition to final table
+      partitions.get(singlePartitionTag)
+          .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
+          .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
+              true,
+              bqServices,
+              jobIdTokenView,
+              tempFilePrefix,
+              NestedValueProvider.of(table, new TableRefToJson()),
+              jsonSchema,
+              writeDisposition,
+              createDisposition,
+              tableDescription))
+          .withSideInputs(jobIdTokenView));
 
-      /**
-       * Returns a copy of this write transformation, but using the specified table description.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withTableDescription(@Nullable String tableDescription) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema,
-            createDisposition, writeDisposition, tableDescription, validate, bigQueryServices);
-      }
+      return PDone.in(input.getPipeline());
+    }
 
-      /**
-       * Returns a copy of this write transformation, but without BigQuery table validation.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, false, bigQueryServices);
-      }
+    private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
+      private transient TableRowWriter writer = null;
+      private final String tempFilePrefix;
 
-      @VisibleForTesting
-      Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, tableDescription, validate, testServices);
+      WriteBundles(String tempFilePrefix) {
+        this.tempFilePrefix = tempFilePrefix;
       }
 
-      private static void verifyTableNotExistOrEmpty(
-          DatasetService datasetService,
-          TableReference tableRef) {
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        if (writer == null) {
+          writer = new TableRowWriter(tempFilePrefix);
+          writer.open(UUID.randomUUID().toString());
+          LOG.debug("Done opening writer {}", writer);
+        }
         try {
-          if (datasetService.getTable(tableRef) != null) {
-            checkState(
-                datasetService.isTableEmpty(tableRef),
-                "BigQuery table is not empty: %s.",
-                BigQueryIO.toTableSpec(tableRef));
-          }
-        } catch (IOException | InterruptedException e) {
-          if (e instanceof InterruptedException) {
-            Thread.currentThread().interrupt();
+          writer.write(c.element());
+        } catch (Exception e) {
+          // Discard write result and close the write.
+          try {
+            writer.close();
+            // The writer does not need to be reset, as this DoFn cannot be reused.
+          } catch (Exception closeException) {
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
           }
-          throw new RuntimeException(
-              "unable to confirm BigQuery table emptiness for table "
-                  + BigQueryIO.toTableSpec(tableRef), e);
+          throw e;
         }
       }
 
-      @Override
-      public void validate(PCollection<TableRow> input) {
-        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-        // Exactly one of the table and table reference can be configured.
-        checkState(
-            jsonTableRef != null || tableRefFunction != null,
-            "must set the table reference of a BigQueryIO.Write transform");
-        checkState(
-            jsonTableRef == null || tableRefFunction == null,
-            "Cannot set both a table reference and a table function for a BigQueryIO.Write"
-                + " transform");
-
-        // Require a schema if creating one or more tables.
-        checkArgument(
-            createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
-            "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
-        // The user specified a table.
-        if (jsonTableRef != null && validate) {
-          TableReference table = getTableWithDefaultProject(options).get();
-
-          DatasetService datasetService = getBigQueryServices().getDatasetService(options);
-          // Check for destination table presence and emptiness for early failure notification.
-          // Note that a presence check can fail when the table or dataset is created by an earlier
-          // stage of the pipeline. For these cases the #withoutValidation method can be used to
-          // disable the check.
-          verifyDatasetPresence(datasetService, table);
-          if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-            verifyTablePresence(datasetService, table);
-          }
-          if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-            verifyTableNotExistOrEmpty(datasetService, table);
-          }
-        }
-
-        if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
-          // We will use BigQuery's streaming write API -- validate supported dispositions.
-          if (tableRefFunction != null) {
-            checkArgument(
-                createDisposition != CreateDisposition.CREATE_NEVER,
-                "CreateDisposition.CREATE_NEVER is not supported when using a tablespec"
-                + " function.");
-          }
-          if (jsonSchema == null) {
-            checkArgument(
-                createDisposition == CreateDisposition.CREATE_NEVER,
-                "CreateDisposition.CREATE_NEVER must be used if jsonSchema is null.");
-          }
-
-          checkArgument(
-              writeDisposition != WriteDisposition.WRITE_TRUNCATE,
-              "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
-                  + " when using a tablespec function.");
-        } else {
-          // We will use a BigQuery load job -- validate the temp location.
-          String tempLocation = options.getTempLocation();
-          checkArgument(
-              !Strings.isNullOrEmpty(tempLocation),
-              "BigQueryIO.Write needs a GCS temp location to store temp files.");
-          if (bigQueryServices == null) {
-            try {
-              GcsPath.fromUri(tempLocation);
-            } catch (IllegalArgumentException e) {
-              throw new IllegalArgumentException(
-                  String.format(
-                      "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                      tempLocation),
-                  e);
-            }
-          }
+      @FinishBundle
+      public void finishBundle(Context c) throws Exception {
+        if (writer != null) {
+          c.output(writer.close());
+          writer = null;
         }
       }
 
       @Override
-      public PDone expand(PCollection<TableRow> input) {
-        Pipeline p = input.getPipeline();
-        BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
-        BigQueryServices bqServices = getBigQueryServices();
-
-        // When writing an Unbounded PCollection, or when a tablespec function is defined, we use
-        // StreamWithDeDup and BigQuery's streaming import API.
-        if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) {
-          return input.apply(
-              new StreamWithDeDup(getTable(), tableRefFunction,
-                  jsonSchema == null ? null : NestedValueProvider.of(
-                      jsonSchema, new JsonSchemaToTableSchema()),
-                  createDisposition,
-                  tableDescription,
-                  bqServices));
-        }
-
-        ValueProvider<TableReference> table = getTableWithDefaultProject(options);
-
-        String stepUuid = randomUUIDString();
-
-        String tempLocation = options.getTempLocation();
-        String tempFilePrefix;
-        try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          tempFilePrefix = factory.resolve(
-                  factory.resolve(tempLocation, "BigQueryWriteTemp"),
-                  stepUuid);
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
-              e);
-        }
-
-        // Create a singleton job ID token at execution time.
-        PCollection<String> singleton = p.apply("Create", Create.of(tempFilePrefix));
-        PCollectionView<String> jobIdTokenView = p
-            .apply("TriggerIdCreation", Create.of("ignored"))
-            .apply("CreateJobId", MapElements.via(
-                new SimpleFunction<String, String>() {
-                  @Override
-                  public String apply(String input) {
-                    return randomUUIDString();
-                  }
-                }))
-            .apply(View.<String>asSingleton());
-
-        PCollection<TableRow> inputInGlobalWindow =
-            input.apply(
-                Window.<TableRow>into(new GlobalWindows())
-                    .triggering(DefaultTrigger.of())
-                    .discardingFiredPanes());
-
-        PCollection<KV<String, Long>> results = inputInGlobalWindow
-            .apply("WriteBundles",
-                ParDo.of(new WriteBundles(tempFilePrefix)));
-
-        TupleTag<KV<Long, List<String>>> multiPartitionsTag =
-            new TupleTag<KV<Long, List<String>>>("multiPartitionsTag") {};
-        TupleTag<KV<Long, List<String>>> singlePartitionTag =
-            new TupleTag<KV<Long, List<String>>>("singlePartitionTag") {};
-
-        PCollectionView<Iterable<KV<String, Long>>> resultsView = results
-            .apply("ResultsView", View.<KV<String, Long>>asIterable());
-        PCollectionTuple partitions = singleton.apply(ParDo
-            .of(new WritePartition(
-                resultsView,
-                multiPartitionsTag,
-                singlePartitionTag))
-            .withSideInputs(resultsView)
-            .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-
-        // Write multiple partitions to separate temporary tables
-        PCollection<String> tempTables = partitions.get(multiPartitionsTag)
-            .apply("MultiPartitionsGroupByKey", GroupByKey.<Long, List<String>>create())
-            .apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(
-                false,
-                bqServices,
-                jobIdTokenView,
-                tempFilePrefix,
-                NestedValueProvider.of(table, new TableRefToJson()),
-                jsonSchema,
-                WriteDisposition.WRITE_EMPTY,
-                CreateDisposition.CREATE_IF_NEEDED,
-                tableDescription))
-            .withSideInputs(jobIdTokenView));
-
-        PCollectionView<Iterable<String>> tempTablesView = tempTables
-            .apply("TempTablesView", View.<String>asIterable());
-        singleton.apply(ParDo
-            .of(new WriteRename(
-                bqServices,
-                jobIdTokenView,
-                NestedValueProvider.of(table, new TableRefToJson()),
-                writeDisposition,
-                createDisposition,
-                tempTablesView,
-                tableDescription))
-            .withSideInputs(tempTablesView, jobIdTokenView));
-
-        // Write single partition to final table
-        partitions.get(singlePartitionTag)
-            .apply("SinglePartitionGroupByKey", GroupByKey.<Long, List<String>>create())
-            .apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(
-                true,
-                bqServices,
-                jobIdTokenView,
-                tempFilePrefix,
-                NestedValueProvider.of(table, new TableRefToJson()),
-                jsonSchema,
-                writeDisposition,
-                createDisposition,
-                tableDescription))
-            .withSideInputs(jobIdTokenView));
+      public void populateDisplayData(DisplayData.Builder builder) {
+        super.populateDisplayData(builder);
 
-        return PDone.in(input.getPipeline());
+        builder
+            .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
+                .withLabel("Temporary File Prefix"));
       }
+    }
 
-      private static class WriteBundles extends DoFn<TableRow, KV<String, Long>> {
-        private transient TableRowWriter writer = null;
-        private final String tempFilePrefix;
-
-        WriteBundles(String tempFilePrefix) {
-          this.tempFilePrefix = tempFilePrefix;
-        }
-
-        @ProcessElement
-        public void processElement(ProcessContext c) throws Exception {
-          if (writer == null) {
-            writer = new TableRowWriter(tempFilePrefix);
-            writer.open(UUID.randomUUID().toString());
-            LOG.debug("Done opening writer {}", writer);
-          }
-          try {
-            writer.write(c.element());
-          } catch (Exception e) {
-            // Discard write result and close the write.
-            try {
-              writer.close();
-              // The writer does not need to be reset, as this DoFn cannot be reused.
-            } catch (Exception closeException) {
-              // Do not mask the exception that caused the write to fail.
-              e.addSuppressed(closeException);
-            }
-            throw e;
-          }
-        }
-
-        @FinishBundle
-        public void finishBundle(Context c) throws Exception {
-          if (writer != null) {
-            c.output(writer.close());
-            writer = null;
-          }
-        }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-        @Override
-        public void populateDisplayData(DisplayData.Builder builder) {
-          super.populateDisplayData(builder);
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-          builder
-              .addIfNotNull(DisplayData.item("tempFilePrefix", tempFilePrefix)
-                  .withLabel("Temporary File Prefix"));
-        }
-      }
+      builder
+          .addIfNotNull(DisplayData.item("table", jsonTableRef)
+            .withLabel("Table Reference"))
+          .addIfNotNull(DisplayData.item("schema", jsonSchema)
+            .withLabel("Table Schema"));
 
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
+      if (tableRefFunction != null) {
+        builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+          .withLabel("Table Reference Function"));
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        builder
-            .addIfNotNull(DisplayData.item("table", jsonTableRef)
-              .withLabel("Table Reference"))
-            .addIfNotNull(DisplayData.item("schema", jsonSchema)
-              .withLabel("Table Schema"));
-
-        if (tableRefFunction != null) {
-          builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
-            .withLabel("Table Reference Function"));
-        }
+      builder
+          .add(DisplayData.item("createDisposition", createDisposition.toString())
+            .withLabel("Table CreateDisposition"))
+          .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+            .withLabel("Table WriteDisposition"))
+          .addIfNotDefault(DisplayData.item("validation", validate)
+            .withLabel("Validation Enabled"), true)
+          .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
+            .withLabel("Table Description"));
+    }
 
-        builder
-            .add(DisplayData.item("createDisposition", createDisposition.toString())
-              .withLabel("Table CreateDisposition"))
-            .add(DisplayData.item("writeDisposition", writeDisposition.toString())
-              .withLabel("Table WriteDisposition"))
-            .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"), true)
-            .addIfNotNull(DisplayData.item("tableDescription", tableDescription)
-              .withLabel("Table Description"));
-      }
+    /** Returns the create disposition. */
+    public CreateDisposition getCreateDisposition() {
+      return createDisposition;
+    }
 
-      /** Returns the create disposition. */
-      public CreateDisposition getCreateDisposition() {
-        return createDisposition;
-      }
+    /** Returns the write disposition. */
+    public WriteDisposition getWriteDisposition() {
+      return writeDisposition;
+    }
 
-      /** Returns the write disposition. */
-      public WriteDisposition getWriteDisposition() {
-        return writeDisposition;
-      }
+    /** Returns the table schema. */
+    public TableSchema getSchema() {
+      return fromJsonString(
+          jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+    }
 
-      /** Returns the table schema. */
-      public TableSchema getSchema() {
-        return fromJsonString(
-            jsonSchema == null ? null : jsonSchema.get(), TableSchema.class);
+    /**
+     * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+     *
+     * <p>If the table's project is not specified, use the executing project.
+     */
+    @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+        BigQueryOptions bqOptions) {
+      ValueProvider<TableReference> table = getTable();
+      if (table == null) {
+        return table;
       }
-
-      /**
-       * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
-       *
-       * <p>If the table's project is not specified, use the executing project.
-       */
-      @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
-          BigQueryOptions bqOptions) {
-        ValueProvider<TableReference> table = getTable();
-        if (table == null) {
-          return table;
-        }
-        if (!table.isAccessible()) {
-          LOG.info("Using a dynamic value for table input. This must contain a project"
-              + " in the table reference: {}", table);
-          return table;
-        }
-        if (Strings.isNullOrEmpty(table.get().getProjectId())) {
-          // If user does not specify a project we assume the table to be located in
-          // the default project.
-          TableReference tableRef = table.get();
-          tableRef.setProjectId(bqOptions.getProject());
-          return NestedValueProvider.of(StaticValueProvider.of(
-              toJsonString(tableRef)), new JsonTableRefToTableRef());
-        }
+      if (!table.isAccessible()) {
+        LOG.info("Using a dynamic value for table input. This must contain a project"
+            + " in the table reference: {}", table);
         return table;
       }
-
-      /** Returns the table reference, or {@code null}. */
-      @Nullable
-      public ValueProvider<TableReference> getTable() {
-        return jsonTableRef == null ? null :
-            NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+      if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+        // If user does not specify a project we assume the table to be located in
+        // the default project.
+        TableReference tableRef = table.get();
+        tableRef.setProjectId(bqOptions.getProject());
+        return NestedValueProvider.of(StaticValueProvider.of(
+            toJsonString(tableRef)), new JsonTableRefToTableRef());
       }
+      return table;
+    }
 
-      /** Returns {@code true} if table validation is enabled. */
-      public boolean getValidate() {
-        return validate;
-      }
+    /** Returns the table reference, or {@code null}. */
+    @Nullable
+    public ValueProvider<TableReference> getTable() {
+      return jsonTableRef == null ? null :
+          NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableRef());
+    }
 
-      private BigQueryServices getBigQueryServices() {
-        if (bigQueryServices == null) {
-          bigQueryServices = new BigQueryServicesImpl();
-        }
-        return bigQueryServices;
+    /** Returns {@code true} if table validation is enabled. */
+    public boolean getValidate() {
+      return validate;
+    }
+
+    private BigQueryServices getBigQueryServices() {
+      if (bigQueryServices == null) {
+        bigQueryServices = new BigQueryServicesImpl();
       }
+      return bigQueryServices;
     }
 
     static class TableRowWriter {
@@ -2231,8 +2180,8 @@ public class BigQueryIO {
         List<String> currResults = Lists.newArrayList();
         for (int i = 0; i < results.size(); ++i) {
           KV<String, Long> fileResult = results.get(i);
-          if (currNumFiles + 1 > Bound.MAX_NUM_FILES
-              || currSizeBytes + fileResult.getValue() > Bound.MAX_SIZE_BYTES) {
+          if (currNumFiles + 1 > Write.MAX_NUM_FILES
+              || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) {
             c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults));
             currResults = Lists.newArrayList();
             currNumFiles = 0;
@@ -2331,13 +2280,13 @@ public class BigQueryIO {
 
         String projectId = ref.getProjectId();
         Job lastFailedLoadJob = null;
-        for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+        for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startLoadJob(jobRef, loadConfig);
-          Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+          Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
           Status jobStatus = parseStatus(loadJob);
           switch (jobStatus) {
             case SUCCEEDED:
@@ -2361,7 +2310,7 @@ public class BigQueryIO {
             "Failed to create load job with id prefix %s, "
                 + "reached max retries: %d, last failed load job: %s.",
             jobIdPrefix,
-            Bound.MAX_RETRY_JOBS,
+            Write.MAX_RETRY_JOBS,
             jobToPrettyString(lastFailedLoadJob)));
       }
 
@@ -2477,13 +2426,13 @@ public class BigQueryIO {
 
         String projectId = ref.getProjectId();
         Job lastFailedCopyJob = null;
-        for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) {
+        for (int i = 0; i < Write.MAX_RETRY_JOBS; ++i) {
           String jobId = jobIdPrefix + "-" + i;
           JobReference jobRef = new JobReference()
               .setProjectId(projectId)
               .setJobId(jobId);
           jobService.startCopyJob(jobRef, copyConfig);
-          Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES);
+          Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
           Status jobStatus = parseStatus(copyJob);
           switch (jobStatus) {
             case SUCCEEDED:
@@ -2507,7 +2456,7 @@ public class BigQueryIO {
             "Failed to create copy job with id prefix %s, "
                 + "reached max retries: %d, last failed copy job: %s.",
             jobIdPrefix,
-            Bound.MAX_RETRY_JOBS,
+            Write.MAX_RETRY_JOBS,
             jobToPrettyString(lastFailedCopyJob)));
       }
 
@@ -2536,9 +2485,6 @@ public class BigQueryIO {
                 .withLabel("Create Disposition"));
       }
     }
-
-    /** Disallow construction of utility class. */
-    private Write() {}
   }
 
   private static String jobToPrettyString(@Nullable Job job) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/7d1f4400/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index bb1528b..f403c5a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -690,11 +690,11 @@ public class BigQueryIOTest implements Serializable {
   }
 
   private void checkWriteObject(
-      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+      BigQueryIO.Write write, String project, String dataset, String table,
       TableSchema schema, CreateDisposition createDisposition,
       WriteDisposition writeDisposition, String tableDescription) {
     checkWriteObjectWithValidate(
-        bound,
+        write,
         project,
         dataset,
         table,
@@ -706,17 +706,17 @@ public class BigQueryIOTest implements Serializable {
   }
 
   private void checkWriteObjectWithValidate(
-      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+      BigQueryIO.Write write, String project, String dataset, String table,
       TableSchema schema, CreateDisposition createDisposition,
       WriteDisposition writeDisposition, String tableDescription, boolean validate) {
-    assertEquals(project, bound.getTable().get().getProjectId());
-    assertEquals(dataset, bound.getTable().get().getDatasetId());
-    assertEquals(table, bound.getTable().get().getTableId());
-    assertEquals(schema, bound.getSchema());
-    assertEquals(createDisposition, bound.createDisposition);
-    assertEquals(writeDisposition, bound.writeDisposition);
-    assertEquals(tableDescription, bound.tableDescription);
-    assertEquals(validate, bound.validate);
+    assertEquals(project, write.getTable().get().getProjectId());
+    assertEquals(dataset, write.getTable().get().getDatasetId());
+    assertEquals(table, write.getTable().get().getTableId());
+    assertEquals(schema, write.getSchema());
+    assertEquals(createDisposition, write.createDisposition);
+    assertEquals(writeDisposition, write.writeDisposition);
+    assertEquals(tableDescription, write.tableDescription);
+    assertEquals(validate, write.validate);
   }
 
   @Before
@@ -1328,10 +1328,10 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWrite() {
-    BigQueryIO.Write.Bound bound =
+    BigQueryIO.Write write =
             BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
@@ -1355,7 +1355,7 @@ public class BigQueryIOTest implements Serializable {
     options.as(StreamingOptions.class).setStreaming(streaming);
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
 
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("project:dataset.table")
         .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
         .withTestServices(new FakeBigQueryServices()
@@ -1375,10 +1375,10 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildWriteWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Write.Bound bound =
+    BigQueryIO.Write write =
         BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
     checkWriteObjectWithValidate(
-        bound,
+        write,
         "foo.com:project",
         "somedataset",
         "sometable",
@@ -1391,9 +1391,9 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildWriteDefaultProject() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
+    BigQueryIO.Write write = BigQueryIO.Write.to("somedataset.sometable");
     checkWriteObject(
-        bound, null, "somedataset", "sometable",
+        write, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
@@ -1403,89 +1403,80 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
+    BigQueryIO.Write write = BigQueryIO.Write.to(table);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
-  @Category(NeedsRunner.class)
-  public void testBuildWriteWithoutTable() {
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("must set the table reference");
-    p.apply(Create.empty(TableRowJsonCoder.of())).apply(BigQueryIO.Write.withoutValidation());
-  }
-
-  @Test
   public void testBuildWriteWithSchema() {
     TableSchema schema = new TableSchema();
-    BigQueryIO.Write.Bound bound =
+    BigQueryIO.Write write =
         BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithCreateDispositionNever() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_NEVER);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithCreateDispositionIfNeeded() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithWriteDispositionTruncate() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE, null);
   }
 
   @Test
   public void testBuildWriteWithWriteDispositionAppend() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_APPEND);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND, null);
   }
 
   @Test
   public void testBuildWriteWithWriteDispositionEmpty() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
     checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
+        write, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, null);
   }
 
   @Test
   public void testBuildWriteWithWriteWithTableDescription() {
     final String tblDescription = "foo bar table";
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withTableDescription(tblDescription);
     checkWriteObject(
-        bound,
+        write,
         "foo.com:project",
         "somedataset",
         "sometable",
@@ -1501,7 +1492,7 @@ public class BigQueryIOTest implements Serializable {
     TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
     final String tblDescription = "foo bar table";
 
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to(tableSpec)
         .withSchema(schema)
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
@@ -1702,35 +1693,6 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testWriteValidateFailsTableAndTableSpec() {
-    p.enableAbandonedNodeEnforcement(false);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Cannot set both a table reference and a table function");
-    p
-        .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write
-            .to("dataset.table")
-            .to(new SerializableFunction<BoundedWindow, String>() {
-              @Override
-              public String apply(BoundedWindow input) {
-                return null;
-              }
-            }));
-  }
-
-  @Test
-  public void testWriteValidateFailsNoTableAndNoTableSpec() {
-    p.enableAbandonedNodeEnforcement(false);
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
-    p
-        .apply(Create.empty(TableRowJsonCoder.of()))
-        .apply("name", BigQueryIO.Write.withoutValidation());
-  }
-
-  @Test
   public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
     FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
         .withJobService(mockJobService)
@@ -2094,7 +2056,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWritePartitionSinglePartition() throws Exception {
-    long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES;
+    long numFiles = BigQueryIO.Write.MAX_NUM_FILES;
     long fileSize = 1;
 
     // One partition is needed.
@@ -2104,7 +2066,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testWritePartitionManyFiles() throws Exception {
-    long numFiles = BigQueryIO.Write.Bound.MAX_NUM_FILES * 3;
+    long numFiles = BigQueryIO.Write.MAX_NUM_FILES * 3;
     long fileSize = 1;
 
     // One partition is needed for each group of BigQueryWrite.MAX_NUM_FILES files.
@@ -2115,7 +2077,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testWritePartitionLargeFileSize() throws Exception {
     long numFiles = 10;
-    long fileSize = BigQueryIO.Write.Bound.MAX_SIZE_BYTES / 3;
+    long fileSize = BigQueryIO.Write.MAX_SIZE_BYTES / 3;
 
     // One partition is needed for each group of three files.
     long expectedNumPartitions = 4;
@@ -2382,7 +2344,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
+    BigQueryIO.Write write = BigQueryIO.Write
         .to(options.getOutputTable())
         .withSchema(NestedValueProvider.of(
             options.getOutputSchema(), new JsonSchemaToTableSchema()))