You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:03:03 UTC

[08/10] incubator-beam git commit: BigQueryIO: move to google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
deleted file mode 100644
index 9141f39..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ /dev/null
@@ -1,2447 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.coders.AtomicCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.Coder.Context;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.AvroUtils;
-import org.apache.beam.sdk.util.BigQueryServices;
-import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.util.BigQueryServices.JobService;
-import org.apache.beam.sdk.util.BigQueryServicesImpl;
-import org.apache.beam.sdk.util.BigQueryTableInserter;
-import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MimeTypes;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.Reshuffle;
-import org.apache.beam.sdk.util.SystemDoFnInternal;
-import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.util.gcsfs.GcsPath;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-
-import com.google.api.client.json.JsonFactory;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.avro.generic.GenericRecord;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.annotation.Nullable;
-
-/**
- * {@link PTransform}s for reading and writing
- * <a href="https://developers.google.com/bigquery/">BigQuery</a> tables.
- *
- * <h3>Table References</h3>
- * <p>A fully-qualified BigQuery table name consists of three components:
- * <ul>
- *   <li>{@code projectId}: the Cloud project id (defaults to
- *       {@link GcpOptions#getProject()}).
- *   <li>{@code datasetId}: the BigQuery dataset id, unique within a project.
- *   <li>{@code tableId}: a table id, unique within a dataset.
- * </ul>
- *
- * <p>BigQuery table references are stored as a {@link TableReference}, which comes
- * from the <a href="https://cloud.google.com/bigquery/client-libraries">
- * BigQuery Java Client API</a>.
- * Tables can be referred to as Strings, with or without the {@code projectId}.
- * A helper function is provided ({@link BigQueryIO#parseTableSpec(String)})
- * that parses the following string forms into a {@link TableReference}:
- *
- * <ul>
- *   <li>[{@code project_id}]:[{@code dataset_id}].[{@code table_id}]
- *   <li>[{@code dataset_id}].[{@code table_id}]
- * </ul>
- *
- * <h3>Reading</h3>
- * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation.
- * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
- * <pre>{@code
- * PCollection<TableRow> shakespeare = pipeline.apply(
- *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
- * }</pre>
- *
- * <p>See {@link TableRow} for more information on the {@link TableRow} object.
- *
- * <p>Users may provide a query to read from rather than reading all of a BigQuery table. If
- * specified, the result obtained by executing the specified query will be used as the data of the
- * input transform.
- *
- * <pre>{@code
- * PCollection<TableRow> shakespeare = pipeline.apply(
- *     BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
- * }</pre>
- *
- * <p>When creating a BigQuery input transform, users should provide either a query or a table.
- * Pipeline construction will fail with a validation error if neither or both are specified.
- *
- * <h3>Writing</h3>
- * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation.
- * This consumes a {@link PCollection} of {@link TableRow TableRows} as input.
- * <pre>{@code
- * PCollection<TableRow> quotes = ...
- *
- * List<TableFieldSchema> fields = new ArrayList<>();
- * fields.add(new TableFieldSchema().setName("source").setType("STRING"));
- * fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
- * TableSchema schema = new TableSchema().setFields(fields);
- *
- * quotes.apply(BigQueryIO.Write
- *     .to("my-project:output.output_table")
- *     .withSchema(schema)
- *     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
- * }</pre>
- *
- * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should
- * append to an existing table, replace the table, or verify that the table is
- * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only
- * be written using {@link WriteDisposition#WRITE_EMPTY} or {@link WriteDisposition#WRITE_APPEND}.
- *
- * <h3>Sharding BigQuery output tables</h3>
- * <p>A common use case is to dynamically generate BigQuery table names based on
- * the current window. To support this,
- * {@link BigQueryIO.Write#to(SerializableFunction)}
- * accepts a function mapping the current window to a tablespec. For example,
- * here's code that outputs daily tables to BigQuery:
- * <pre>{@code
- * PCollection<TableRow> quotes = ...
- * quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
- *       .apply(BigQueryIO.Write
- *         .withSchema(schema)
- *         .to(new SerializableFunction<BoundedWindow, String>() {
- *           public String apply(BoundedWindow window) {
- *             // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
- *             String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
- *                  .withZone(DateTimeZone.UTC)
- *                  .print(((IntervalWindow) window).start());
- *             return "my-project:output.output_table_" + dayString;
- *           }
- *         }));
- * }</pre>
- *
- * <p>Per-window tables are not yet supported in batch mode.
- *
- * <h3>Permissions</h3>
- * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding {@link PipelineRunner}s for
- * more details.
- *
- * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
- * </a> for security and permission related information specific to BigQuery.
- */
-public class BigQueryIO {
-  private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
-
-  /**
-   * Singleton instance of the JSON factory used to read and write JSON
-   * formatted rows.
-   */
-  private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
-
-  /**
-   * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
-   * IDs must start with a letter and may not end with a dash.
-   * This regex isn't exact - this allows for patterns that would be rejected by
-   * the service, but this is sufficient for basic parsing of table references.
-   */
-  private static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]{4,61}[a-z0-9]";
-
-  /**
-   * Regular expression that matches Dataset IDs.
-   */
-  private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";
-
-  /**
-   * Regular expression that matches Table IDs.
-   */
-  private static final String TABLE_REGEXP = "[-\\w$@]{1,1024}";
-
-  /**
-   * Matches table specifications in the form {@code "[project_id]:[dataset_id].[table_id]"} or
-   * {@code "[dataset_id].[table_id]"}.
-   */
-  private static final String DATASET_TABLE_REGEXP =
-      String.format("((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)", PROJECT_ID_REGEXP,
-          DATASET_REGEXP, TABLE_REGEXP);
-
-  private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
-
-  // TODO: make this private and remove improper access from BigQueryIOTranslator.
-  public static final String SET_PROJECT_FROM_OPTIONS_WARNING =
-      "No project specified for BigQuery table \"%1$s.%2$s\". Assuming it is in \"%3$s\". If the"
-      + " table is in a different project please specify it as a part of the BigQuery table"
-      + " definition.";
-
-  private static final String RESOURCE_NOT_FOUND_ERROR =
-      "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
-          + " execution. If the %1$s is created by an earlier stage of the pipeline, this"
-          + " validation can be disabled using #withoutValidation.";
-
-  private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR =
-      "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by"
-          + " an earlier stage of the pipeline, this validation can be disabled using"
-          + " #withoutValidation.";
-
-  /**
-   * Parse a table specification in the form
-   * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
-   *
-   * <p>If the project id is omitted, the default project id is used.
-   */
-  public static TableReference parseTableSpec(String tableSpec) {
-    Matcher match = TABLE_SPEC.matcher(tableSpec);
-    if (!match.matches()) {
-      throw new IllegalArgumentException(
-          "Table reference is not in [project_id]:[dataset_id].[table_id] "
-          + "format: " + tableSpec);
-    }
-
-    TableReference ref = new TableReference();
-    ref.setProjectId(match.group("PROJECT"));
-
-    return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
-  }
-
-  /**
-   * Returns a canonical string representation of the {@link TableReference}.
-   */
-  public static String toTableSpec(TableReference ref) {
-    StringBuilder sb = new StringBuilder();
-    if (ref.getProjectId() != null) {
-      sb.append(ref.getProjectId());
-      sb.append(":");
-    }
-
-    sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
-    return sb.toString();
-  }
-
-  /**
-   * A {@link PTransform} that reads from a BigQuery table and returns a
-   * {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table.
-   *
-   * <p>Each {@link TableRow} contains values indexed by column name. Here is a
-   * sample processing function that processes a "line" column from rows:
-   * <pre>{@code
-   * static class ExtractWordsFn extends DoFn<TableRow, String> {
-   *   public void processElement(ProcessContext c) {
-   *     // Get the "line" field of the TableRow object, split it into words, and emit them.
-   *     TableRow row = c.element();
-   *     String[] words = row.get("line").toString().split("[^a-zA-Z']+");
-   *     for (String word : words) {
-   *       if (!word.isEmpty()) {
-   *         c.output(word);
-   *       }
-   *     }
-   *   }
-   * }}</pre>
-   */
-  public static class Read {
-
-    /**
-     * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
-     * {@code "[dataset_id].[table_id]"} for tables within the current project.
-     */
-    public static Bound from(String tableSpec) {
-      return new Bound().from(tableSpec);
-    }
-
-    /**
-     * Reads results received after executing the given query.
-     */
-    public static Bound fromQuery(String query) {
-      return new Bound().fromQuery(query);
-    }
-
-    /**
-     * Reads a BigQuery table specified as a {@link TableReference} object.
-     */
-    public static Bound from(TableReference table) {
-      return new Bound().from(table);
-    }
-
-    /**
-     * Disables BigQuery table validation, which is enabled by default.
-     */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
-    }
-
-    /**
-     * A {@link PTransform} that reads from a BigQuery table and returns a bounded
-     * {@link PCollection} of {@link TableRow TableRows}.
-     */
-    public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
-      @Nullable final String jsonTableRef;
-      @Nullable final String query;
-
-      /**
-       * Disable validation that the table exists or the query succeeds prior to pipeline
-       * submission. Basic validation (such as ensuring that a query or table is specified) still
-       * occurs.
-       */
-      final boolean validate;
-      @Nullable final Boolean flattenResults;
-      @Nullable BigQueryServices bigQueryServices;
-
-      private static final String QUERY_VALIDATION_FAILURE_ERROR =
-          "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
-          + " pipeline, This validation can be disabled using #withoutValidation.";
-
-      // The maximum number of retries to poll a BigQuery job in the cleanup phase.
-      // We expect the jobs have already DONE, and don't need a high max retires.
-      private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10;
-
-      private Bound() {
-        this(
-            null /* name */,
-            null /* query */,
-            null /* jsonTableRef */,
-            true /* validate */,
-            null /* flattenResults */,
-            null /* bigQueryServices */);
-      }
-
-      private Bound(
-          String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate,
-          @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) {
-        super(name);
-        this.jsonTableRef = jsonTableRef;
-        this.query = query;
-        this.validate = validate;
-        this.flattenResults = flattenResults;
-        this.bigQueryServices = bigQueryServices;
-      }
-
-      /**
-       * Returns a copy of this transform that reads from the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound from(String tableSpec) {
-        return from(parseTableSpec(tableSpec));
-      }
-
-      /**
-       * Returns a copy of this transform that reads from the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound from(TableReference table) {
-        return new Bound(
-            name, query, toJsonString(table), validate, flattenResults, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this transform that reads the results of the specified query.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>By default, the query results will be flattened -- see
-       * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
-       * Jobs documentation</a> for more information.  To disable flattening, use
-       * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
-       */
-      public Bound fromQuery(String query) {
-        return new Bound(name, query, jsonTableRef, validate,
-            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), bigQueryServices);
-      }
-
-      /**
-       * Disable validation that the table exists or the query succeeds prior to pipeline
-       * submission. Basic validation (such as ensuring that a query or table is specified) still
-       * occurs.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, query, jsonTableRef, false, flattenResults, bigQueryServices);
-      }
-
-      /**
-       * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
-       * flattening of query results</a>.
-       *
-       * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
-       * from a table will cause an error during validation.
-       */
-      public Bound withoutResultFlattening() {
-        return new Bound(name, query, jsonTableRef, validate, false, bigQueryServices);
-      }
-
-      @VisibleForTesting
-      Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices);
-      }
-
-      @Override
-      public void validate(PInput input) {
-        // Even if existence validation is disabled, we need to make sure that the BigQueryIO
-        // read is properly specified.
-        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-        TableReference table = getTableWithDefaultProject(bqOptions);
-        if (table == null && query == null) {
-          throw new IllegalStateException(
-              "Invalid BigQuery read operation, either table reference or query has to be set");
-        } else if (table != null && query != null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
-              + " query and a table, only one of these should be provided");
-        } else if (table != null && flattenResults != null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-              + " table with a result flattening preference, which is not configurable");
-        } else if (query != null && flattenResults == null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-              + " query without a result flattening preference");
-        }
-
-        if (validate) {
-          BigQueryServices bqServices = getBigQueryServices();
-          // Check for source table/query presence for early failure notification.
-          // Note that a presence check can fail if the table or dataset are created by earlier
-          // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these
-          // cases the withoutValidation method can be used to disable the check.
-          if (table != null) {
-            DatasetService datasetService = bqServices.getDatasetService(bqOptions);
-            verifyDatasetPresence(datasetService, table);
-            verifyTablePresence(datasetService, table);
-          }
-          if (query != null) {
-            JobService jobService = bqServices.getJobService(bqOptions);
-            try {
-              jobService.dryRunQuery(bqOptions.getProject(), query);
-            } catch (Exception e) {
-              throw new IllegalArgumentException(
-                  String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
-            }
-          }
-        }
-      }
-
-      @Override
-      public PCollection<TableRow> apply(PInput input) {
-        String uuid = randomUUIDString();
-        final String jobIdToken = "beam_job_" + uuid;
-
-        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-        BoundedSource<TableRow> source;
-        final BigQueryServices bqServices = getBigQueryServices();
-
-        final String extractDestinationDir;
-        String tempLocation = bqOptions.getTempLocation();
-        try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          extractDestinationDir = factory.resolve(tempLocation, uuid);
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to resolve extract destination directory in %s", tempLocation));
-        }
-
-        final String executingProject = bqOptions.getProject();
-        if (!Strings.isNullOrEmpty(query)) {
-          String queryTempDatasetId = "temp_dataset_" + uuid;
-          String queryTempTableId = "temp_table_" + uuid;
-
-          TableReference queryTempTableRef = new TableReference()
-              .setProjectId(executingProject)
-              .setDatasetId(queryTempDatasetId)
-              .setTableId(queryTempTableId);
-
-          source = BigQueryQuerySource.create(
-              jobIdToken, query, queryTempTableRef, flattenResults,
-              extractDestinationDir, bqServices);
-        } else {
-          TableReference inputTable = getTableWithDefaultProject(bqOptions);
-          source = BigQueryTableSource.create(
-              jobIdToken, inputTable, extractDestinationDir, bqServices, executingProject);
-        }
-        PassThroughThenCleanup.CleanupOperation cleanupOperation =
-            new PassThroughThenCleanup.CleanupOperation() {
-              @Override
-              void cleanup(PipelineOptions options) throws Exception {
-                BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-
-                JobReference jobRef = new JobReference()
-                    .setProjectId(executingProject)
-                    .setJobId(getExtractJobId(jobIdToken));
-                Job extractJob = bqServices.getJobService(bqOptions).pollJob(
-                    jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
-
-                Collection<String> extractFiles = null;
-                if (extractJob != null) {
-                  extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
-                } else {
-                  IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
-                  Collection<String> dirMatch = factory.match(extractDestinationDir);
-                  if (!dirMatch.isEmpty()) {
-                    extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
-                  }
-                }
-                if (extractFiles != null && !extractFiles.isEmpty()) {
-                  new GcsUtilFactory().create(options).remove(extractFiles);
-                }
-              }};
-        return input.getPipeline()
-            .apply(org.apache.beam.sdk.io.Read.from(source))
-            .setCoder(getDefaultOutputCoder())
-            .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
-      }
-
-      @Override
-      protected Coder<TableRow> getDefaultOutputCoder() {
-        return TableRowJsonCoder.of();
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        TableReference table = getTable();
-
-        if (table != null) {
-          builder.add(DisplayData.item("table", toTableSpec(table))
-            .withLabel("Table"));
-        }
-
-        builder
-            .addIfNotNull(DisplayData.item("query", query)
-              .withLabel("Query"))
-            .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
-              .withLabel("Flatten Query Results"))
-            .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"),
-                true);
-      }
-
-      /**
-       * Returns the table to read, or {@code null} if reading from a query instead.
-       *
-       * <p>If the table's project is not specified, use the executing project.
-       */
-      @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
-        TableReference table = getTable();
-        if (table != null && Strings.isNullOrEmpty(table.getProjectId())) {
-          // If user does not specify a project we assume the table to be located in
-          // the default project.
-          table.setProjectId(bqOptions.getProject());
-        }
-        return table;
-      }
-
-      /**
-       * Returns the table to read, or {@code null} if reading from a query instead.
-       */
-      @Nullable
-      public TableReference getTable() {
-        return fromJsonString(jsonTableRef, TableReference.class);
-      }
-
-      /**
-       * Returns the query to be read, or {@code null} if reading from a table instead.
-       */
-      public String getQuery() {
-        return query;
-      }
-
-      /**
-       * Returns true if table validation is enabled.
-       */
-      public boolean getValidate() {
-        return validate;
-      }
-
-      /**
-       * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
-       */
-      public Boolean getFlattenResults() {
-        return flattenResults;
-      }
-
-      private BigQueryServices getBigQueryServices() {
-        if (bigQueryServices == null) {
-          bigQueryServices = new BigQueryServicesImpl();
-        }
-        return bigQueryServices;
-      }
-    }
-
-    /** Disallow construction of utility class. */
-    private Read() {}
-  }
-
-  /**
-   * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
-   * has been processed.
-   */
-  @VisibleForTesting
-  static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private CleanupOperation cleanupOperation;
-
-    PassThroughThenCleanup(CleanupOperation cleanupOperation) {
-      this.cleanupOperation = cleanupOperation;
-    }
-
-    @Override
-    public PCollection<T> apply(PCollection<T> input) {
-      TupleTag<T> mainOutput = new TupleTag<>();
-      TupleTag<Void> cleanupSignal = new TupleTag<>();
-      PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
-          .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
-
-      PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
-          .setCoder(VoidCoder.of())
-          .apply(View.<Void>asSingleton().withDefaultValue(null));
-
-      input.getPipeline()
-          .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
-          .apply("Cleanup", ParDo.of(
-              new DoFn<CleanupOperation, Void>() {
-                @Override
-                public void processElement(ProcessContext c)
-                    throws Exception {
-                  c.element().cleanup(c.getPipelineOptions());
-                }
-              }).withSideInputs(cleanupSignalView));
-
-      return outputs.get(mainOutput);
-    }
-
-    private static class IdentityFn<T> extends DoFn<T, T> {
-      @Override
-      public void processElement(ProcessContext c) {
-        c.output(c.element());
-      }
-    }
-
-    abstract static class CleanupOperation implements Serializable {
-      abstract void cleanup(PipelineOptions options) throws Exception;
-    }
-  }
-
-  /**
-   * A {@link BigQuerySourceBase} for reading BigQuery tables.
-   */
-  @VisibleForTesting
-  static class BigQueryTableSource extends BigQuerySourceBase {
-
-    static BigQueryTableSource create(
-        String jobIdToken,
-        TableReference table,
-        String extractDestinationDir,
-        BigQueryServices bqServices,
-        String executingProject) {
-      return new BigQueryTableSource(
-          jobIdToken, table, extractDestinationDir, bqServices, executingProject);
-    }
-
-    private final String jsonTable;
-    private final AtomicReference<Long> tableSizeBytes;
-
-    private BigQueryTableSource(
-        String jobIdToken,
-        TableReference table,
-        String extractDestinationDir,
-        BigQueryServices bqServices,
-        String executingProject) {
-      super(jobIdToken, extractDestinationDir, bqServices, executingProject);
-      checkNotNull(table, "table");
-      this.jsonTable = toJsonString(table);
-      this.tableSizeBytes = new AtomicReference<>();
-    }
-
-    @Override
-    protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
-      return JSON_FACTORY.fromString(jsonTable, TableReference.class);
-    }
-
-    @Override
-    public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      TableReference tableRef = JSON_FACTORY.fromString(jsonTable, TableReference.class);
-      return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
-    }
-
-    @Override
-    public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      if (tableSizeBytes.get() == null) {
-        TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class);
-
-        Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
-            .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId())
-            .getNumBytes();
-        tableSizeBytes.compareAndSet(null, numBytes);
-      }
-      return tableSizeBytes.get();
-    }
-
-    @Override
-    protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
-      // Do nothing.
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("table", jsonTable));
-    }
-  }
-
-  /**
-   * A {@link BigQuerySourceBase} for querying BigQuery tables.
-   */
-  @VisibleForTesting
-  static class BigQueryQuerySource extends BigQuerySourceBase {
-
-    static BigQueryQuerySource create(
-        String jobIdToken,
-        String query,
-        TableReference queryTempTableRef,
-        Boolean flattenResults,
-        String extractDestinationDir,
-        BigQueryServices bqServices) {
-      return new BigQueryQuerySource(
-          jobIdToken,
-          query,
-          queryTempTableRef,
-          flattenResults,
-          extractDestinationDir,
-          bqServices);
-    }
-
-    private final String query;
-    private final String jsonQueryTempTable;
-    private final Boolean flattenResults;
-    private transient AtomicReference<JobStatistics> dryRunJobStats;
-
-    private BigQueryQuerySource(
-        String jobIdToken,
-        String query,
-        TableReference queryTempTableRef,
-        Boolean flattenResults,
-        String extractDestinationDir,
-        BigQueryServices bqServices) {
-      super(jobIdToken, extractDestinationDir, bqServices,
-          checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId());
-      this.query = checkNotNull(query, "query");
-      this.jsonQueryTempTable = toJsonString(queryTempTableRef);
-      this.flattenResults = checkNotNull(flattenResults, "flattenResults");
-      this.dryRunJobStats = new AtomicReference<>();
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
-    }
-
-    @Override
-    public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      return new BigQueryReader(this, bqServices.getReaderFromQuery(
-          bqOptions, query, executingProject, flattenResults));
-    }
-
-    @Override
-    protected TableReference getTableToExtract(BigQueryOptions bqOptions)
-        throws IOException, InterruptedException {
-      // 1. Find the location of the query.
-      TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
-          .getQuery()
-          .getReferencedTables()
-          .get(0);
-      DatasetService tableService = bqServices.getDatasetService(bqOptions);
-      String location = tableService.getTable(
-          dryRunTempTable.getProjectId(),
-          dryRunTempTable.getDatasetId(),
-          dryRunTempTable.getTableId()).getLocation();
-
-      // 2. Create the temporary dataset in the query location.
-      TableReference tableToExtract =
-          JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
-      tableService.createDataset(
-          tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
-
-      // 3. Execute the query.
-      String queryJobId = jobIdToken + "-query";
-      executeQuery(
-          executingProject,
-          queryJobId,
-          query,
-          tableToExtract,
-          flattenResults,
-          bqServices.getJobService(bqOptions));
-      return tableToExtract;
-    }
-
-    @Override
-    protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
-      TableReference tableToRemove =
-          JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
-
-      DatasetService tableService = bqServices.getDatasetService(bqOptions);
-      tableService.deleteTable(
-          tableToRemove.getProjectId(),
-          tableToRemove.getDatasetId(),
-          tableToRemove.getTableId());
-      tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("query", query));
-    }
-    private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
-        throws InterruptedException, IOException {
-      if (dryRunJobStats.get() == null) {
-        JobStatistics jobStats =
-            bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query);
-        dryRunJobStats.compareAndSet(null, jobStats);
-      }
-      return dryRunJobStats.get();
-    }
-
-    private static void executeQuery(
-        String executingProject,
-        String jobId,
-        String query,
-        TableReference destinationTable,
-        boolean flattenResults,
-        JobService jobService) throws IOException, InterruptedException {
-      JobReference jobRef = new JobReference()
-          .setProjectId(executingProject)
-          .setJobId(jobId);
-      JobConfigurationQuery queryConfig = new JobConfigurationQuery();
-      queryConfig
-          .setQuery(query)
-          .setAllowLargeResults(true)
-          .setCreateDisposition("CREATE_IF_NEEDED")
-          .setDestinationTable(destinationTable)
-          .setFlattenResults(flattenResults)
-          .setPriority("BATCH")
-          .setWriteDisposition("WRITE_EMPTY");
-      jobService.startQueryJob(jobRef, queryConfig);
-      Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
-      if (parseStatus(job) != Status.SUCCEEDED) {
-        throw new IOException("Query job failed: " + jobId);
-      }
-      return;
-    }
-
-    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
-      in.defaultReadObject();
-      dryRunJobStats = new AtomicReference<>();
-    }
-  }
-
-  /**
-   * An abstract {@link BoundedSource} to read a table from BigQuery.
-   *
-   * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
-   * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
-   * and {@link BigQueryQuerySource}, depending on the configuration of the read.
-   * Specifically,
-   * <ul>
-   * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
-   * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
-   * </ul>
-   * ...
-   */
-  private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
-    // The maximum number of attempts to verify temp files.
-    private static final int MAX_FILES_VERIFY_ATTEMPTS = 10;
-
-    // The maximum number of retries to poll a BigQuery job.
-    protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
-    // The initial backoff for verifying temp files.
-    private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
-
-    protected final String jobIdToken;
-    protected final String extractDestinationDir;
-    protected final BigQueryServices bqServices;
-    protected final String executingProject;
-
-    private BigQuerySourceBase(
-        String jobIdToken,
-        String extractDestinationDir,
-        BigQueryServices bqServices,
-        String executingProject) {
-      this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
-      this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-      this.executingProject = checkNotNull(executingProject, "executingProject");
-    }
-
-    @Override
-    public List<BoundedSource<TableRow>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      TableReference tableToExtract = getTableToExtract(bqOptions);
-      JobService jobService = bqServices.getJobService(bqOptions);
-      String extractJobId = getExtractJobId(jobIdToken);
-      List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
-
-      TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
-          tableToExtract.getProjectId(),
-          tableToExtract.getDatasetId(),
-          tableToExtract.getTableId()).getSchema();
-
-      cleanupTempResource(bqOptions);
-      return createSources(tempFiles, tableSchema);
-    }
-
-    protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
-
-    protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return false;
-    }
-
-    @Override
-    public void validate() {
-      // Do nothing, validation is done in BigQuery.Read.
-    }
-
-    @Override
-    public Coder<TableRow> getDefaultOutputCoder() {
-      return TableRowJsonCoder.of();
-    }
-
-    private List<String> executeExtract(
-        String jobId, TableReference table, JobService jobService)
-            throws InterruptedException, IOException {
-      JobReference jobRef = new JobReference()
-          .setProjectId(executingProject)
-          .setJobId(jobId);
-
-      String destinationUri = getExtractDestinationUri(extractDestinationDir);
-      JobConfigurationExtract extract = new JobConfigurationExtract()
-          .setSourceTable(table)
-          .setDestinationFormat("AVRO")
-          .setDestinationUris(ImmutableList.of(destinationUri));
-
-      LOG.info("Starting BigQuery extract job: {}", jobId);
-      jobService.startExtractJob(jobRef, extract);
-      Job extractJob =
-          jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
-      if (parseStatus(extractJob) != Status.SUCCEEDED) {
-        throw new IOException(String.format(
-            "Extract job %s failed, status: %s",
-            extractJob.getJobReference().getJobId(), extractJob.getStatus()));
-      }
-
-      List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
-      return ImmutableList.copyOf(tempFiles);
-    }
-
-    private List<BoundedSource<TableRow>> createSources(
-        List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
-      final String jsonSchema = JSON_FACTORY.toString(tableSchema);
-
-      SerializableFunction<GenericRecord, TableRow> function =
-          new SerializableFunction<GenericRecord, TableRow>() {
-            @Override
-            public TableRow apply(GenericRecord input) {
-              return AvroUtils.convertGenericRecordToTableRow(
-                  input, fromJsonString(jsonSchema, TableSchema.class));
-            }};
-
-      List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
-      BackOff backoff = new AttemptBoundedExponentialBackOff(
-          MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS);
-      for (String fileName : files) {
-        while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
-          if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) {
-            break;
-          }
-        }
-        avroSources.add(new TransformingSource<>(
-            AvroSource.from(fileName), function, getDefaultOutputCoder()));
-      }
-      return ImmutableList.copyOf(avroSources);
-    }
-
-    protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> {
-      private final BigQuerySourceBase source;
-      private final BigQueryServices.BigQueryJsonReader reader;
-
-      private BigQueryReader(
-          BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
-        this.source = source;
-        this.reader = reader;
-      }
-
-      @Override
-      public BoundedSource<TableRow> getCurrentSource() {
-        return source;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return reader.start();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return reader.advance();
-      }
-
-      @Override
-      public TableRow getCurrent() throws NoSuchElementException {
-        return reader.getCurrent();
-      }
-
-      @Override
-      public void close() throws IOException {
-        reader.close();
-      }
-    }
-  }
-
-  /**
-   * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
-   * and transforms elements to type {@code V}.
-  */
-  @VisibleForTesting
-  static class TransformingSource<T, V> extends BoundedSource<V> {
-    private final BoundedSource<T> boundedSource;
-    private final SerializableFunction<T, V> function;
-    private final Coder<V> outputCoder;
-
-    TransformingSource(
-        BoundedSource<T> boundedSource,
-        SerializableFunction<T, V> function,
-        Coder<V> outputCoder) {
-      this.boundedSource = boundedSource;
-      this.function = function;
-      this.outputCoder = outputCoder;
-    }
-
-    @Override
-    public List<? extends BoundedSource<V>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      return Lists.transform(
-          boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
-          new Function<BoundedSource<T>, BoundedSource<V>>() {
-            @Override
-            public BoundedSource<V> apply(BoundedSource<T> input) {
-              return new TransformingSource<>(input, function, outputCoder);
-            }
-          });
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return boundedSource.getEstimatedSizeBytes(options);
-    }
-
-    @Override
-    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
-      return boundedSource.producesSortedKeys(options);
-    }
-
-    @Override
-    public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
-      return new TransformingReader(boundedSource.createReader(options));
-    }
-
-    @Override
-    public void validate() {
-      boundedSource.validate();
-    }
-
-    @Override
-    public Coder<V> getDefaultOutputCoder() {
-      return outputCoder;
-    }
-
-    private class TransformingReader extends BoundedReader<V> {
-      private final BoundedReader<T> boundedReader;
-
-      private TransformingReader(BoundedReader<T> boundedReader) {
-        this.boundedReader = boundedReader;
-      }
-
-      @Override
-      public synchronized BoundedSource<V> getCurrentSource() {
-        return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return boundedReader.start();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return boundedReader.advance();
-      }
-
-      @Override
-      public V getCurrent() throws NoSuchElementException {
-        T current = boundedReader.getCurrent();
-        return function.apply(current);
-      }
-
-      @Override
-      public void close() throws IOException {
-        boundedReader.close();
-      }
-
-      @Override
-      public synchronized BoundedSource<V> splitAtFraction(double fraction) {
-        return new TransformingSource<>(
-            boundedReader.splitAtFraction(fraction), function, outputCoder);
-      }
-
-      @Override
-      public Double getFractionConsumed() {
-        return boundedReader.getFractionConsumed();
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return boundedReader.getCurrentTimestamp();
-      }
-    }
-  }
-
-  private static String getExtractJobId(String jobIdToken) {
-    return jobIdToken + "-extract";
-  }
-
-  private static String getExtractDestinationUri(String extractDestinationDir) {
-    return String.format("%s/%s", extractDestinationDir, "*.avro");
-  }
-
-  private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
-      throws IOException {
-    JobStatistics jobStats = extractJob.getStatistics();
-    List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
-    if (counts.size() != 1) {
-      String errorMessage = (counts.size() == 0
-          ? "No destination uri file count received."
-          : String.format("More than one destination uri file count received. First two are %s, %s",
-              counts.get(0), counts.get(1)));
-      throw new RuntimeException(errorMessage);
-    }
-    long filesCount = counts.get(0);
-
-    ImmutableList.Builder<String> paths = ImmutableList.builder();
-    IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
-    for (long i = 0; i < filesCount; ++i) {
-      String filePath =
-          factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
-      paths.add(filePath);
-    }
-    return paths.build();
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows}
-   * to a BigQuery table.
-   *
-   * <p>In BigQuery, each table has an encosing dataset. The dataset being written must already
-   * exist.
-   *
-   * <p>By default, tables will be created if they do not exist, which corresponds to a
-   * {@link CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's
-   * Jobs API. A schema must be provided (via {@link BigQueryIO.Write#withSchema(TableSchema)}),
-   * or else the transform may fail at runtime with an {@link IllegalArgumentException}.
-   *
-   * <p>By default, writes require an empty table, which corresponds to
-   * a {@link WriteDisposition#WRITE_EMPTY} disposition that matches the
-   * default of BigQuery's Jobs API.
-   *
-   * <p>Here is a sample transform that produces TableRow values containing
-   * "word" and "count" columns:
-   * <pre>{@code
-   * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
-   *   public void processElement(ProcessContext c) {
-   *     TableRow row = new TableRow()
-   *         .set("word", c.element().getKey())
-   *         .set("count", c.element().getValue().intValue());
-   *     c.output(row);
-   *   }
-   * }}</pre>
-   */
-  public static class Write {
-    /**
-     * An enumeration type for the BigQuery create disposition strings.
-     *
-     * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition">
-     * <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a>
-     */
-    public enum CreateDisposition {
-      /**
-       * Specifics that tables should not be created.
-       *
-       * <p>If the output table does not exist, the write fails.
-       */
-      CREATE_NEVER,
-
-      /**
-       * Specifies that tables should be created if needed. This is the default
-       * behavior.
-       *
-       * <p>Requires that a table schema is provided via {@link BigQueryIO.Write#withSchema}.
-       * This precondition is checked before starting a job. The schema is
-       * not required to match an existing table's schema.
-       *
-       * <p>When this transformation is executed, if the output table does not
-       * exist, the table is created from the provided schema. Note that even if
-       * the table exists, it may be recreated if necessary when paired with a
-       * {@link WriteDisposition#WRITE_TRUNCATE}.
-       */
-      CREATE_IF_NEEDED
-    }
-
-    /**
-     * An enumeration type for the BigQuery write disposition strings.
-     *
-     * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition">
-     * <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a>
-     */
-    public enum WriteDisposition {
-      /**
-       * Specifies that write should replace a table.
-       *
-       * <p>The replacement may occur in multiple steps - for instance by first
-       * removing the existing table, then creating a replacement, then filling
-       * it in. This is not an atomic operation, and external programs may
-       * see the table in any of these intermediate steps.
-       */
-      WRITE_TRUNCATE,
-
-      /**
-       * Specifies that rows may be appended to an existing table.
-       */
-      WRITE_APPEND,
-
-      /**
-       * Specifies that the output table must be empty. This is the default
-       * behavior.
-       *
-       * <p>If the output table is not empty, the write fails at runtime.
-       *
-       * <p>This check may occur long before data is written, and does not
-       * guarantee exclusive access to the table. If two programs are run
-       * concurrently, each specifying the same output table and
-       * a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is possible
-       * for both to succeed.
-       */
-      WRITE_EMPTY
-    }
-
-    /**
-     * Creates a write transformation for the given table specification.
-     *
-     * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
-     */
-    public static Bound to(String tableSpec) {
-      return new Bound().to(tableSpec);
-    }
-
-    /** Creates a write transformation for the given table. */
-    public static Bound to(TableReference table) {
-      return new Bound().to(table);
-    }
-
-    /**
-     * Creates a write transformation from a function that maps windows to table specifications.
-     * Each time a new window is encountered, this function will be called and the resulting table
-     * will be created. Records within that window will be written to the associated table.
-     *
-     * <p>See {@link #parseTableSpec(String)} for the format that {@code tableSpecFunction} should
-     * return.
-     *
-     * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
-     * always return the same table specification.
-     */
-    public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-      return new Bound().to(tableSpecFunction);
-    }
-
-    /**
-     * Creates a write transformation from a function that maps windows to {@link TableReference}
-     * objects.
-     *
-     * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
-     * always return the same table reference.
-     */
-    public static Bound toTableReference(
-        SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-      return new Bound().toTableReference(tableRefFunction);
-    }
-
-    /**
-     * Creates a write transformation with the specified schema to use in table creation.
-     *
-     * <p>The schema is <i>required</i> only if writing to a table that does not already
-     * exist, and {@link CreateDisposition} is set to
-     * {@link CreateDisposition#CREATE_IF_NEEDED}.
-     */
-    public static Bound withSchema(TableSchema schema) {
-      return new Bound().withSchema(schema);
-    }
-
-    /** Creates a write transformation with the specified options for creating the table. */
-    public static Bound withCreateDisposition(CreateDisposition disposition) {
-      return new Bound().withCreateDisposition(disposition);
-    }
-
-    /** Creates a write transformation with the specified options for writing to the table. */
-    public static Bound withWriteDisposition(WriteDisposition disposition) {
-      return new Bound().withWriteDisposition(disposition);
-    }
-
-    /**
-     * Creates a write transformation with BigQuery table validation disabled.
-     */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
-    }
-
-    /**
-     * A {@link PTransform} that can write either a bounded or unbounded
-     * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
-     */
-    public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
-      @Nullable final String jsonTableRef;
-
-      @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
-
-      // Table schema. The schema is required only if the table does not exist.
-      @Nullable final String jsonSchema;
-
-      // Options for creating the table. Valid values are CREATE_IF_NEEDED and
-      // CREATE_NEVER.
-      final CreateDisposition createDisposition;
-
-      // Options for writing to the table. Valid values are WRITE_TRUNCATE,
-      // WRITE_APPEND and WRITE_EMPTY.
-      final WriteDisposition writeDisposition;
-
-      // An option to indicate if table validation is desired. Default is true.
-      final boolean validate;
-
-      @Nullable private BigQueryServices bigQueryServices;
-
-      private static class TranslateTableSpecFunction implements
-          SerializableFunction<BoundedWindow, TableReference> {
-        private SerializableFunction<BoundedWindow, String> tableSpecFunction;
-
-        TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-          this.tableSpecFunction = tableSpecFunction;
-        }
-
-        @Override
-        public TableReference apply(BoundedWindow value) {
-          return parseTableSpec(tableSpecFunction.apply(value));
-        }
-      }
-
-      /**
-       * @deprecated Should be private. Instead, use one of the factory methods in
-       * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an
-       * instance of this class.
-       */
-      @Deprecated
-      public Bound() {
-        this(
-            null /* name */,
-            null /* jsonTableRef */,
-            null /* tableRefFunction */,
-            null /* jsonSchema */,
-            CreateDisposition.CREATE_IF_NEEDED,
-            WriteDisposition.WRITE_EMPTY,
-            true /* validate */,
-            null /* bigQueryServices */);
-      }
-
-      private Bound(String name, @Nullable String jsonTableRef,
-          @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
-          @Nullable String jsonSchema,
-          CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate,
-          @Nullable BigQueryServices bigQueryServices) {
-        super(name);
-        this.jsonTableRef = jsonTableRef;
-        this.tableRefFunction = tableRefFunction;
-        this.jsonSchema = jsonSchema;
-        this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-        this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-        this.validate = validate;
-        this.bigQueryServices = bigQueryServices;
-      }
-
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table. Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(String tableSpec) {
-        return to(parseTableSpec(tableSpec));
-      }
-
-      /**
-       * Returns a copy of this write transformation, but writing to the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(TableReference table) {
-        return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this write transformation, but using the specified function to determine
-       * which table to write to for each window.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
-       * should always return the same table specification.
-       */
-      public Bound to(
-          SerializableFunction<BoundedWindow, String> tableSpecFunction) {
-        return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
-      }
-
-      /**
-       * Returns a copy of this write transformation, but using the specified function to determine
-       * which table to write to for each window.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
-       * always return the same table reference.
-       */
-      public Bound toTableReference(
-          SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this write transformation, but using the specified schema for rows
-       * to be written.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withSchema(TableSchema schema) {
-        return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
-            createDisposition, writeDisposition, validate, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this write transformation, but using the specified create disposition.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withCreateDisposition(CreateDisposition createDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this write transformation, but using the specified write disposition.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withWriteDisposition(WriteDisposition writeDisposition) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this write transformation, but without BigQuery table validation.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutValidation() {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, false, bigQueryServices);
-      }
-
-      @VisibleForTesting
-      Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
-            writeDisposition, validate, testServices);
-      }
-
-      private static void verifyTableEmpty(
-          DatasetService datasetService,
-          TableReference table) {
-        try {
-          boolean isEmpty = datasetService.isTableEmpty(
-              table.getProjectId(), table.getDatasetId(), table.getTableId());
-          if (!isEmpty) {
-            throw new IllegalArgumentException(
-                "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
-          }
-        } catch (IOException | InterruptedException e) {
-          ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-          if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
-            // Nothing to do. If the table does not exist, it is considered empty.
-          } else {
-            throw new RuntimeException(
-                "unable to confirm BigQuery table emptiness for table "
-                    + BigQueryIO.toTableSpec(table), e);
-          }
-        }
-      }
-
-      @Override
-      public void validate(PCollection<TableRow> input) {
-        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-        // Exactly one of the table and table reference can be configured.
-        checkState(
-            jsonTableRef != null || tableRefFunction != null,
-            "must set the table reference of a BigQueryIO.Write transform");
-        checkState(
-            jsonTableRef == null || tableRefFunction == null,
-            "Cannot set both a table reference and a table function for a BigQueryIO.Write"
-                + " transform");
-
-        // Require a schema if creating one or more tables.
-        checkArgument(
-            createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
-            "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
-
-        // The user specified a table.
-        if (jsonTableRef != null && validate) {
-          TableReference table = getTableWithDefaultProject(options);
-
-          DatasetService datasetService = getBigQueryServices().getDatasetService(options);
-          // Check for destination table presence and emptiness for early failure notification.
-          // Note that a presence check can fail when the table or dataset is created by an earlier
-          // stage of the pipeline. For these cases the #withoutValidation method can be used to
-          // disable the check.
-          verifyDatasetPresence(datasetService, table);
-          if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
-            verifyTablePresence(datasetService, table);
-          }
-          if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
-            verifyTableEmpty(datasetService, table);
-          }
-        }
-
-        if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
-          // We will use BigQuery's streaming write API -- validate supported dispositions.
-          checkArgument(
-              createDisposition != CreateDisposition.CREATE_NEVER,
-              "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when"
-                  + " using a tablespec function.");
-
-          checkArgument(
-              writeDisposition != WriteDisposition.WRITE_TRUNCATE,
-              "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
-                  + " when using a tablespec function.");
-        } else {
-          // We will use a BigQuery load job -- validate the temp location.
-          String tempLocation = options.getTempLocation();
-          checkArgument(
-              !Strings.isNullOrEmpty(tempLocation),
-              "BigQueryIO.Write needs a GCS temp location to store temp files.");
-          if (bigQueryServices == null) {
-            try {
-              GcsPath.fromUri(tempLocation);
-            } catch (IllegalArgumentException e) {
-              throw new IllegalArgumentException(
-                  String.format(
-                      "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
-                      tempLocation),
-                  e);
-            }
-          }
-        }
-      }
-
-      @Override
-      public PDone apply(PCollection<TableRow> input) {
-        BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
-
-        // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
-        // and BigQuery's streaming import API.
-        if (options.isStreaming() || tableRefFunction != null) {
-          return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
-        }
-
-        TableReference table = fromJsonString(jsonTableRef, TableReference.class);
-        if (Strings.isNullOrEmpty(table.getProjectId())) {
-          table.setProjectId(options.getProject());
-        }
-        String jobIdToken = randomUUIDString();
-        String tempLocation = options.getTempLocation();
-        String tempFilePrefix;
-        try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          tempFilePrefix = factory.resolve(
-                  factory.resolve(tempLocation, "BigQuerySinkTemp"),
-                  jobIdToken);
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
-              e);
-        }
-
-        BigQueryServices bqServices = getBigQueryServices();
-        return input.apply("Write", org.apache.beam.sdk.io.Write.to(
-            new BigQuerySink(
-                jobIdToken,
-                table,
-                jsonSchema,
-                getWriteDisposition(),
-                getCreateDisposition(),
-                tempFilePrefix,
-                input.getCoder(),
-                bqServices)));
-      }
-
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
-
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-
-        builder
-            .addIfNotNull(DisplayData.item("table", jsonTableRef)
-              .withLabel("Table Reference"))
-            .addIfNotNull(DisplayData.item("schema", jsonSchema)
-              .withLabel("Table Schema"));
-
-        if (tableRefFunction != null) {
-          builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
-            .withLabel("Table Reference Function"));
-        }
-
-        builder
-            .add(DisplayData.item("createDisposition", createDisposition.toString())
-              .withLabel("Table CreateDisposition"))
-            .add(DisplayData.item("writeDisposition", writeDisposition.toString())
-              .withLabel("Table WriteDisposition"))
-            .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"), true);
-      }
-
-      /** Returns the create disposition. */
-      public CreateDisposition getCreateDisposition() {
-        return createDisposition;
-      }
-
-      /** Returns the write disposition. */
-      public WriteDisposition getWriteDisposition() {
-        return writeDisposition;
-      }
-
-      /** Returns the table schema. */
-      public TableSchema getSchema() {
-        return fromJsonString(jsonSchema, TableSchema.class);
-      }
-
-      /**
-       * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
-       *
-       * <p>If the table's project is not specified, use the executing project.
-       */
-      @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
-        TableReference table = getTable();
-        if (table != null && Strings.isNullOrEmpty(table.getProjectId())) {
-          // If user does not specify a project we assume the table to be located in
-          // the default project.
-          table.setProjectId(bqOptions.getProject());
-        }
-        return table;
-      }
-
-      /** Returns the table reference, or {@code null}. */
-      @Nullable
-      public TableReference getTable() {
-        return fromJsonString(jsonTableRef, TableReference.class);
-      }
-
-      /** Returns {@code true} if table validation is enabled. */
-      public boolean getValidate() {
-        return validate;
-      }
-
-      private BigQueryServices getBigQueryServices() {
-        if (bigQueryServices == null) {
-          bigQueryServices = new BigQueryServicesImpl();
-        }
-        return bigQueryServices;
-      }
-    }
-
-    /** Disallow construction of utility class. */
-    private Write() {}
-  }
-
-  /**
-   * {@link BigQuerySink} is implemented as a {@link FileBasedSink}.
-   *
-   * <p>It uses BigQuery load job to import files into BigQuery.
-   */
-  static class BigQuerySink extends FileBasedSink<TableRow> {
-    private final String jobIdToken;
-    @Nullable private final String jsonTable;
-    @Nullable private final String jsonSchema;
-    private final WriteDisposition writeDisposition;
-    private final CreateDisposition createDisposition;
-    private final Coder<TableRow> coder;
-    private final BigQueryServices bqServices;
-
-    public BigQuerySink(
-        String jobIdToken,
-        @Nullable TableReference table,
-        @Nullable String jsonSchema,
-        WriteDisposition writeDisposition,
-        CreateDisposition createDisposition,
-        String tempFile,
-        Coder<TableRow> coder,
-        BigQueryServices bqServices) {
-      super(tempFile, ".json");
-      this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
-      if (table == null) {
-        this.jsonTable = null;
-      } else {
-        checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
-            "Table %s should have a project specified", table);
-        this.jsonTable = toJsonString(table);
-      }
-      this.jsonSchema = jsonSchema;
-      this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
-      this.createDisposition = checkNotNull(createDisposition, "createDisposition");
-      this.coder = checkNotNull(coder, "coder");
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-     }
-
-    @Override
-    public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
-        PipelineOptions options) {
-      return new BigQueryWriteOperation(this);
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder
-          .addIfNotNull(DisplayData.item("schema", jsonSchema)
-            .withLabel("Table Schema"))
-          .addIfNotNull(DisplayData.item("tableSpec", jsonTable)
-            .withLabel("Table Specification"));
-    }
-
-    private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
-      // The maximum number of retry load jobs.
-      private static final int MAX_RETRY_LOAD_JOBS = 3;
-
-      // The maximum number of retries to poll the status of a load job.
-      // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-      private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
-      private final BigQuerySink bigQuerySink;
-
-      private BigQueryWriteOperation(BigQuerySink sink) {
-        super(checkNotNull(sink, "sink"));
-        this.bigQuerySink = sink;
-      }
-
-      @Override
-      public FileBasedWriter<TableRow> createWriter(PipelineOptions options) throws Exception {
-        return new TableRowWriter(this, bigQuerySink.coder);
-      }
-
-      @Override
-      public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
-          throws IOException, InterruptedException {
-        try {
-          BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-          List<String> tempFiles = Lists.newArrayList();
-          for (FileResult result : writerResults) {
-            tempFiles.add(result.getFilename());
-          }
-          if (!tempFiles.isEmpty()) {
-              load(
-                  bigQuerySink.bqServices.getJobService(bqOptions),
-                  bigQuerySink.jobIdToken,
-                  fromJsonString(bigQuerySink.jsonTable, TableReference.class),
-                  tempFiles,
-                  fromJsonString(bigQuerySink.jsonSchema, TableSchema.class),
-                  bigQuerySink.writeDisposition,
-                  bigQuerySink.createDisposition);
-          }
-        } finally {
-          removeTemporaryFiles(options);
-        }
-      }
-
-      /**
-       * Import files into BigQuery with load jobs.
-       *
-       * <p>Returns if files are successfully loaded into BigQuery.
-       * Throws a RuntimeException if:
-       *     1. The status of one load job is UNKNOWN. This is to avoid duplicating data.
-       *     2. It exceeds {@code MAX_RETRY_LOAD_JOBS}.
-       *
-       * <p>If a load job failed, it will try another load job with a different job id.
-       */
-      private void load(
-          JobService jobService,
-          String jobIdPrefix,
-          TableReference ref,
-          List<String> gcsUris,
-          @Nullable TableSchema schema,
-          WriteDisposition writeDisposition,
-          CreateDisposition createDisposition) throws InterruptedException, IOException {
-        JobConfigurationLoad loadConfig = new JobConfigurationLoad()
-            .setSourceUris(gcsUris)
-            .setDestinationTable(ref)
-            .setSchema(schema)
-            .setWriteDisposition(writeDisposition.name())
-            .setCreateDisposition(createDisposition.name())
-            .setSourceFormat("NEWLINE_DELIMITED_JSON");
-
-        boolean retrying = false;
-        String projectId = ref.getProjectId();
-        for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) {
-          String jobId = jobIdPrefix + "-" + i;
-          if (retrying) {
-            LOG.info("Previous load jobs failed, retrying.");
-          }
-          LOG.info("Starting BigQuery load job: {}", jobId);
-          JobReference jobRef = new JobReference()
-              .setProjectId(projectId)
-              .setJobId(jobId);
-          jobService.startLoadJob(jobRef, loadConfig);
-          Status jobStatus =
-              parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES));
-          switch (jobStatus) {
-            case SUCCEEDED:
-              return;
-            case UNKNOWN:
-              throw new RuntimeException("Failed to poll the load job status.");
-            case FAILED:
-              LOG.info("BigQuery load job failed: {}", jobId);
-              retrying = true;
-              continue;
-            default:
-              throw new IllegalStateException("Unexpected job status: " + jobStatus);
-          }
-        }
-        throw new RuntimeException(
-            "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS);
-      }
-    }
-
-    private static class TableRowWriter extends FileBasedWriter<TableRow> {
-      private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
-      private final Coder<TableRow> coder;
-      private OutputStream out;
-
-      public TableRowWriter(
-          FileBasedWriteOperation<TableRow> writeOperation, Coder<TableRow> coder) {
-        super(writeOperation);
-        this.mimeType = MimeTypes.TEXT;
-        this.coder = coder;
-      }
-
-      @Override
-      protected void prepareWrite(WritableByteChannel channel) throws Exception {
-        out = Channels.newOutputStream(channel);
-      }
-
-      @Override
-      public void write(TableRow value) throws Exception {
-        // Use Context.OUTER to encode and NEWLINE as the delimeter.
-        coder.encode(value, out, Context.OUTER);
-        out.write(NEWLINE);
-      }
-    }
-  }
-
-  private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
-    try {
-      datasetService.getDataset(table.getProjectId(), table.getDatasetId());
-    } catch (Exception e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
-        throw new IllegalArgumentException(
-            String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)),
-            e);
-      } else {
-        throw new RuntimeException(
-            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset",
-                BigQueryIO.toTableSpec(table)),
-            e);
-      }
-    }
-  }
-
-  private static void verifyTablePresence(DatasetService datasetService, TableReference table) {
-    try {
-      datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId());
-    } catch (Exception e) {
-      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
-      if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
-        throw new IllegalArgumentException(
-            String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e);
-      } else {
-        throw new RuntimeException(
-            String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
-                BigQueryIO.toTableSpec(table)),
-            e);
-      }
-    }
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Implementation of DoFn to perform streaming BigQuery write.
-   */
-  @SystemDoFnInternal
-  private static class StreamingWriteFn
-      extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
-    /** TableSchema in JSON. Use String to make the class Serializable. */
-    private final String jsonTableSchema;
-
-    /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
-    private transient Map<String, List<TableRow>> tableRows;
-
-    /** The list of unique ids for each BigQuery table row. */
-    private transient Map<String, List<String>> uniqueIdsForTableRows;
-
-    /** The list of tables created so far, so we don't try the creation
-        each time. */
-    private static Set<String> createdTables =
-        Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
-
-    /** Tracks bytes written, exposed as "ByteCount" Counter. */
-    private Aggregator<Long, Long> byteCountAggregator =
-        createAggregator("ByteCount", new Sum.SumLongFn());
-
-    /** Constructor. */
-    StreamingWriteFn(TableSchema schema) {
-      jsonTableSchema = toJsonString(schema);
-    }
-
-    /** Prepares a target BigQuery table. */
-    @Override
-    public void startBundle(Context context) {
-      tableRows = new HashMap<>();
-      uniqueIdsForTableRows = new HashMap<>();
-    }
-
-    /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
-    @Override
-    public void processElement(ProcessContext context) {
-      String tableSpec = context.element().getKey().getKey();
-      List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
-      List<String> uniqueIds = getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec);
-
-      rows.add(context.element().getValue().tableRow);
-      uniqueIds.add(context.element().getValue().uniqueId);
-    }
-
-    /** Writes the accumulated rows into BigQuery with streaming API. */
-    @Override
-    public void finishBundle(Context context) throws Exception {
-      BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
-      Bigquery client = Transport.newBigQueryClient(options).build();
-
-      for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
-        TableReference tableReference = getOrCreateTable(options, entry.getKey());
-        flushRows(client, tableReference, entry.getValue(),
-            uniqueIdsForTableRows.get(entry.getKey()), options);
-      }
-      tableRows.clear();
-      uniqueIdsForTableRows.clear();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-
-      builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema)
-        .withLabel("Table Schema"));
-    }
-
-    public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
-        throws IOException {
-      TableReference tableReference = parseTableSpec(tableSpec);
-      if (!createdTables.contains(tableSpec)) {
-        synchronized (createdTables) {
-          // Another thread may have succeeded in creating the table in the meanwhile, so
-          // check again. This check isn't needed for correctness, but we add it to prevent
-          // every thread from attempting a create and overwhelming our BigQuery quota.
-          if (!createdTables.contains(tableSpec)) {
-            TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
-            Bigquery client = Transport.newBigQueryClient(options).build();
-            BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
-            inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
-                CreateDisposition.CREATE_IF_NEEDED, tableSchema);
-            createdTables.add(tableSpec);
-          }
-        }
-      }
-      return tableReference;
-    }
-
-    /** Writes the accumulated rows into BigQuery with streaming API. */
-    private void flushRows(Bigquery client, TableReference tableReference,
-        List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) {
-      if (!tableRows.isEmpty()) {
-        try {
-          BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
-          inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator);
-        } catch (IOException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-  }
-
-  private static class ShardedKey<K> {
-    private final K key;
-    private final int shardNumber;
-
-    public static <K> ShardedKey<K> of(K key, int shardNumber) {
-      return new ShardedKey<K>(key, shardNumber);
-    }
-
-    private ShardedKey(K key, int shardNumber) {
-      this.key = key;
-      this.shardNumber = shardNumber;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    public int getShardNumber() {
-      return shardNumber;
-    }
-  }
-
-  /**
-   * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
-   */
-  private static class ShardedKeyCoder<KeyT>
-      extends StandardCoder<ShardedKey<KeyT>> {
-    public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
-      return new ShardedKeyCoder<>(keyCoder);
-    }
-
-    @JsonCreator
-    public static <KeyT> ShardedKeyCoder<KeyT> of(
-         @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
-        List<Coder<KeyT>> components) {
-      checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
-      return of(components.get(0));
-    }
-
-    protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
-      this.keyCoder = keyCoder;
-      this.shardNumberCoder = VarIntCoder.of();
-    }
-
-    @Override
-    public List<? extends Coder<?>> getCoderArguments() {
-      return Arrays.asList(keyCoder);
-    }
-
-    @Override
-    public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
-        throws IOException {
-      keyCoder.encode(key.getKey(), outStream, context.nested());
-      shardNumberCoder.encode(key.getShardNumber(), outStream, context);
-    }
-
-    @Override
-    public ShardedKey<KeyT> decode(InputStream inStream, Context context)
-        throws IOException {
-      return new ShardedKey<KeyT>(
-          keyCoder.decode(inStream, context.nested()),
-          shardNumberCoder.decode(inStream, context));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      keyCoder.verifyDeterministic();
-    }
-
-    Coder<KeyT> keyCoder;
-    VarIntCoder shardNumberCoder;
-  }
-
-  private static class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
-    private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
-
-    @JsonCreator
-    public static TableRowInfoCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(TableRowInfo value, OutputStream outStream, Context context)
-      throws IOException {
-      if (value == null) {
-        throw new CoderException("cannot encode a null value");
-      }
-      tableRowCoder.encode(value.tableRow, outStream, context.nested());
-      idCoder.encode(value.uniqueId, outStream, context.nested());
-    }
-
-    @Override
-    public TableRowInfo decode(InputStream inStream, Context context)
-      throws IOException {
-      return new TableRowInfo(
-          tableRowCoder.decode(inStream, context.nested()),
-          idCoder.decode(inStream, context.nested()));
-    }
-
-    @Override
-    public void verifyDeterministic() throws NonDeterministicException {
-      throw new NonDeterministicException(this, "TableRows are not deterministic.");
-    }
-
-    TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of();
-    StringUtf8Coder idCoder = StringUtf8Coder.of();
-  }
-
-  private static class TableRowInfo {
-    TableRowInfo(TableRo

<TRUNCATED>