You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/28 15:51:47 UTC

[5/6] beam git commit: Refactor BigQueryIO helper methods into a BigQueryHelpers class. Move helper transforms for BigQueryIO.Read into individual files. Change private to package in BigQueryHelpers.

Refactor BigQueryIO helper methods into a BigQueryHelpers class.
Move helper transforms for BigQueryIO.Read into individual files.
Change private to package in BigQueryHelpers.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cc2e81f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cc2e81f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cc2e81f

Branch: refs/heads/master
Commit: 2cc2e81f241e1d5d590f12d4a8491273908e2cce
Parents: 434eadb
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 17 15:15:16 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Mar 28 08:46:15 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/bigquery/BigQueryHelpers.java    | 243 ++++++
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 853 ++-----------------
 .../io/gcp/bigquery/BigQueryQuerySource.java    | 186 ++++
 .../io/gcp/bigquery/BigQueryServicesImpl.java   |   2 +-
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 178 ++++
 .../io/gcp/bigquery/BigQueryTableSource.java    |  86 ++
 .../io/gcp/bigquery/PassThroughThenCleanup.java |  66 ++
 .../sdk/io/gcp/bigquery/TransformingSource.java | 118 +++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     |  54 +-
 .../sdk/io/gcp/bigquery/BigQueryUtilTest.java   |  12 +-
 10 files changed, 970 insertions(+), 828 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
new file mode 100644
index 0000000..9fba938
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -0,0 +1,243 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+
+/**
+ * A set of helper functions and classes used by {@link BigQueryIO}.
+ */
+public class BigQueryHelpers {
+  @Nullable
+  /**
+   * Return a displayable string representation for a {@link TableReference}.
+   */
+  static ValueProvider<String> displayTable(
+      @Nullable ValueProvider<TableReference> table) {
+    if (table == null) {
+      return null;
+    }
+    return NestedValueProvider.of(table, new TableRefToTableSpec());
+  }
+
+  /**
+   * Returns a canonical string representation of the {@link TableReference}.
+   */
+  static String toTableSpec(TableReference ref) {
+    StringBuilder sb = new StringBuilder();
+    if (ref.getProjectId() != null) {
+      sb.append(ref.getProjectId());
+      sb.append(":");
+    }
+
+    sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
+    return sb.toString();
+  }
+
+  static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
+    List<V> value = map.get(key);
+    if (value == null) {
+      value = new ArrayList<>();
+      map.put(key, value);
+    }
+    return value;
+  }
+
+  /**
+   * Parse a table specification in the form
+   * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
+   *
+   * <p>If the project id is omitted, the default project id is used.
+   */
+  static TableReference parseTableSpec(String tableSpec) {
+    Matcher match = BigQueryIO.TABLE_SPEC.matcher(tableSpec);
+    if (!match.matches()) {
+      throw new IllegalArgumentException(
+          "Table reference is not in [project_id]:[dataset_id].[table_id] "
+          + "format: " + tableSpec);
+    }
+
+    TableReference ref = new TableReference();
+    ref.setProjectId(match.group("PROJECT"));
+
+    return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
+  }
+
+  static String jobToPrettyString(@Nullable Job job) throws IOException {
+    return job == null ? "null" : job.toPrettyString();
+  }
+
+  static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
+    return status == null ? "Unknown status: null." : status.toPrettyString();
+  }
+
+  static Status parseStatus(@Nullable Job job) {
+    if (job == null) {
+      return Status.UNKNOWN;
+    }
+    JobStatus status = job.getStatus();
+    if (status.getErrorResult() != null) {
+      return Status.FAILED;
+    } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
+      return Status.FAILED;
+    } else {
+      return Status.SUCCEEDED;
+    }
+  }
+
+  @VisibleForTesting
+  static String toJsonString(Object item) {
+    if (item == null) {
+      return null;
+    }
+    try {
+      return BigQueryIO.JSON_FACTORY.toString(item);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()),
+          e);
+    }
+  }
+
+  @VisibleForTesting
+  static <T> T fromJsonString(String json, Class<T> clazz) {
+    if (json == null) {
+      return null;
+    }
+    try {
+      return BigQueryIO.JSON_FACTORY.fromString(json, clazz);
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json),
+          e);
+    }
+  }
+
+  /**
+   * Returns a randomUUID string.
+   *
+   * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
+   */
+  static String randomUUIDString() {
+    return UUID.randomUUID().toString().replaceAll("-", "");
+  }
+
+  @VisibleForTesting
+  static class JsonSchemaToTableSchema
+      implements SerializableFunction<String, TableSchema> {
+    @Override
+    public TableSchema apply(String from) {
+      return fromJsonString(from, TableSchema.class);
+    }
+  }
+
+  @VisibleForTesting
+  static class BeamJobUuidToBigQueryJobUuid
+      implements SerializableFunction<String, String> {
+    @Override
+    public String apply(String from) {
+      return "beam_job_" + from;
+    }
+  }
+
+  static class TableSchemaToJsonSchema
+      implements SerializableFunction<TableSchema, String> {
+    @Override
+    public String apply(TableSchema from) {
+      return toJsonString(from);
+    }
+  }
+
+  static class JsonTableRefToTableRef
+      implements SerializableFunction<String, TableReference> {
+    @Override
+    public TableReference apply(String from) {
+      return fromJsonString(from, TableReference.class);
+    }
+  }
+
+  static class TableRefToTableSpec
+      implements SerializableFunction<TableReference, String> {
+    @Override
+    public String apply(TableReference from) {
+      return toTableSpec(from);
+    }
+  }
+
+  static class TableRefToJson
+      implements SerializableFunction<TableReference, String> {
+    @Override
+    public String apply(TableReference from) {
+      return toJsonString(from);
+    }
+  }
+
+  static class TableRefToProjectId
+      implements SerializableFunction<TableReference, String> {
+    @Override
+    public String apply(TableReference from) {
+      return from.getProjectId();
+    }
+  }
+
+  @VisibleForTesting
+  static class TableSpecToTableRef
+      implements SerializableFunction<String, TableReference> {
+    @Override
+    public TableReference apply(String from) {
+      return parseTableSpec(from);
+    }
+  }
+
+  @VisibleForTesting
+  static class CreatePerBeamJobUuid
+      implements SerializableFunction<String, String> {
+    private final String stepUuid;
+
+    CreatePerBeamJobUuid(String stepUuid) {
+      this.stepUuid = stepUuid;
+    }
+
+    @Override
+    public String apply(String jobUuid) {
+      return stepUuid + "_" + jobUuid.replaceAll("-", "");
+    }
+  }
+
+  @VisibleForTesting
+  static class CreateJsonTableRefFromUuid
+      implements SerializableFunction<String, TableReference> {
+    private final String executingProject;
+
+    public CreateJsonTableRefFromUuid(String executingProject) {
+      this.executingProject = executingProject;
+    }
+
+    @Override
+    public TableReference apply(String jobUuid) {
+      String queryTempDatasetId = "temp_dataset_" + jobUuid;
+      String queryTempTableId = "temp_table_" + jobUuid;
+      TableReference queryTempTableRef = new TableReference()
+          .setProjectId(executingProject)
+          .setDatasetId(queryTempDatasetId)
+          .setTableId(queryTempTableId);
+      return queryTempTableRef;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d195afd..3f2f3e8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -25,13 +25,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.api.client.json.JsonFactory;
 import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
 import com.google.api.services.bigquery.model.JobReference;
 import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.Table;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
@@ -39,38 +37,31 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.auto.value.AutoValue;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.io.CountingOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.ObjectInputStream;
 import java.io.OutputStream;
-import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
@@ -82,8 +73,16 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.TableRowJsonCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.AvroSource;
 import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
 import org.apache.beam.sdk.metrics.Counter;
@@ -131,7 +130,6 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
-import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -153,7 +151,7 @@ import org.slf4j.LoggerFactory;
  * from the <a href="https://cloud.google.com/bigquery/client-libraries">
  * BigQuery Java Client API</a>.
  * Tables can be referred to as Strings, with or without the {@code projectId}.
- * A helper function is provided ({@link BigQueryIO#parseTableSpec(String)})
+ * A helper function is provided ({@link BigQueryHelpers#parseTableSpec(String)})
  * that parses the following string forms into a {@link TableReference}:
  *
  * <ul>
@@ -253,7 +251,7 @@ public class BigQueryIO {
    * Singleton instance of the JSON factory used to read and write JSON
    * formatted rows.
    */
-  private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
+  static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
 
   /**
    * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
@@ -281,7 +279,7 @@ public class BigQueryIO {
       String.format("((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)", PROJECT_ID_REGEXP,
           DATASET_REGEXP, TABLE_REGEXP);
 
-  private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
+  static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
 
   private static final String RESOURCE_NOT_FOUND_ERROR =
       "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
@@ -293,152 +291,6 @@ public class BigQueryIO {
           + " an earlier stage of the pipeline, this validation can be disabled using"
           + " #withoutValidation.";
 
-  /**
-   * Parse a table specification in the form
-   * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
-   *
-   * <p>If the project id is omitted, the default project id is used.
-   */
-  public static TableReference parseTableSpec(String tableSpec) {
-    Matcher match = TABLE_SPEC.matcher(tableSpec);
-    if (!match.matches()) {
-      throw new IllegalArgumentException(
-          "Table reference is not in [project_id]:[dataset_id].[table_id] "
-          + "format: " + tableSpec);
-    }
-
-    TableReference ref = new TableReference();
-    ref.setProjectId(match.group("PROJECT"));
-
-    return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
-  }
-
-  /**
-   * Returns a canonical string representation of the {@link TableReference}.
-   */
-  public static String toTableSpec(TableReference ref) {
-    StringBuilder sb = new StringBuilder();
-    if (ref.getProjectId() != null) {
-      sb.append(ref.getProjectId());
-      sb.append(":");
-    }
-
-    sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
-    return sb.toString();
-  }
-
-  @VisibleForTesting
-  static class JsonSchemaToTableSchema
-      implements SerializableFunction<String, TableSchema> {
-    @Override
-    public TableSchema apply(String from) {
-      return fromJsonString(from, TableSchema.class);
-    }
-  }
-
-  private static class TableSchemaToJsonSchema
-      implements SerializableFunction<TableSchema, String> {
-    @Override
-    public String apply(TableSchema from) {
-      return toJsonString(from);
-    }
-  }
-
-  private static class JsonTableRefToTableRef
-      implements SerializableFunction<String, TableReference> {
-    @Override
-    public TableReference apply(String from) {
-      return fromJsonString(from, TableReference.class);
-    }
-  }
-
-  private static class TableRefToTableSpec
-      implements SerializableFunction<TableReference, String> {
-    @Override
-    public String apply(TableReference from) {
-      return toTableSpec(from);
-    }
-  }
-
-  private static class TableRefToJson
-      implements SerializableFunction<TableReference, String> {
-    @Override
-    public String apply(TableReference from) {
-      return toJsonString(from);
-    }
-  }
-
-  private static class TableRefToProjectId
-      implements SerializableFunction<TableReference, String> {
-    @Override
-    public String apply(TableReference from) {
-      return from.getProjectId();
-    }
-  }
-
-  @VisibleForTesting
-  static class TableSpecToTableRef
-      implements SerializableFunction<String, TableReference> {
-    @Override
-    public TableReference apply(String from) {
-      return parseTableSpec(from);
-    }
-  }
-
-  @VisibleForTesting
-  static class BeamJobUuidToBigQueryJobUuid
-      implements SerializableFunction<String, String> {
-    @Override
-    public String apply(String from) {
-      return "beam_job_" + from;
-    }
-  }
-
-  @VisibleForTesting
-  static class CreatePerBeamJobUuid
-      implements SerializableFunction<String, String> {
-    private final String stepUuid;
-
-    private CreatePerBeamJobUuid(String stepUuid) {
-      this.stepUuid = stepUuid;
-    }
-
-    @Override
-    public String apply(String jobUuid) {
-      return stepUuid + "_" + jobUuid.replaceAll("-", "");
-    }
-  }
-
-  @VisibleForTesting
-  static class CreateJsonTableRefFromUuid
-      implements SerializableFunction<String, TableReference> {
-    private final String executingProject;
-
-    private CreateJsonTableRefFromUuid(String executingProject) {
-      this.executingProject = executingProject;
-    }
-
-    @Override
-    public TableReference apply(String jobUuid) {
-      String queryTempDatasetId = "temp_dataset_" + jobUuid;
-      String queryTempTableId = "temp_table_" + jobUuid;
-      TableReference queryTempTableRef = new TableReference()
-          .setProjectId(executingProject)
-          .setDatasetId(queryTempDatasetId)
-          .setTableId(queryTempTableId);
-      return queryTempTableRef;
-    }
-  }
-
-  @Nullable
-  private static ValueProvider<String> displayTable(
-      @Nullable ValueProvider<TableReference> table) {
-    if (table == null) {
-      return null;
-    }
-    return NestedValueProvider.of(table, new TableRefToTableSpec());
-  }
-
 
   /**
    * A formatting function that maps a TableRow to itself. This allows sending a
@@ -556,7 +408,7 @@ public class BigQueryIO {
      * Read from table specified by a {@link TableReference}.
      */
     public Read from(TableReference table) {
-      return from(StaticValueProvider.of(toTableSpec(table)));
+      return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
     }
 
     private static final String QUERY_VALIDATION_FAILURE_ERROR =
@@ -672,7 +524,7 @@ public class BigQueryIO {
 
     @Override
     public PCollection<TableRow> expand(PBegin input) {
-      String stepUuid = randomUUIDString();
+      String stepUuid = BigQueryHelpers.randomUUIDString();
       BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
       ValueProvider<String> jobUuid = NestedValueProvider.of(
          StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
@@ -752,7 +604,7 @@ public class BigQueryIO {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
       builder
-          .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
+          .addIfNotNull(DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider()))
             .withLabel("Table"))
           .addIfNotNull(DisplayData.item("query", getQuery())
             .withLabel("Query"))
@@ -787,7 +639,7 @@ public class BigQueryIO {
         TableReference tableRef = table.get();
         tableRef.setProjectId(bqOptions.getProject());
         return NestedValueProvider.of(StaticValueProvider.of(
-            toJsonString(tableRef)), new JsonTableRefToTableRef());
+            BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
       }
       return table;
     }
@@ -810,541 +662,15 @@ public class BigQueryIO {
     }
   }
 
-  /**
-   * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
-   * has been processed.
-   */
-  @VisibleForTesting
-  static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
-    private CleanupOperation cleanupOperation;
-
-    PassThroughThenCleanup(CleanupOperation cleanupOperation) {
-      this.cleanupOperation = cleanupOperation;
-    }
-
-    @Override
-    public PCollection<T> expand(PCollection<T> input) {
-      TupleTag<T> mainOutput = new TupleTag<>();
-      TupleTag<Void> cleanupSignal = new TupleTag<>();
-      PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
-          .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
-
-      PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
-          .setCoder(VoidCoder.of())
-          .apply(View.<Void>asSingleton().withDefaultValue(null));
-
-      input.getPipeline()
-          .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
-          .apply("Cleanup", ParDo.of(
-              new DoFn<CleanupOperation, Void>() {
-                @ProcessElement
-                public void processElement(ProcessContext c)
-                    throws Exception {
-                  c.element().cleanup(c.getPipelineOptions());
-                }
-              }).withSideInputs(cleanupSignalView));
-
-      return outputs.get(mainOutput);
-    }
-
-    private static class IdentityFn<T> extends DoFn<T, T> {
-      @ProcessElement
-      public void processElement(ProcessContext c) {
-        c.output(c.element());
-      }
-    }
-
-    abstract static class CleanupOperation implements Serializable {
-      abstract void cleanup(PipelineOptions options) throws Exception;
-    }
-  }
-
-  /**
-   * A {@link BigQuerySourceBase} for reading BigQuery tables.
-   */
-  @VisibleForTesting
-  static class BigQueryTableSource extends BigQuerySourceBase {
-
-    static BigQueryTableSource create(
-        ValueProvider<String> jobIdToken,
-        ValueProvider<TableReference> table,
-        String extractDestinationDir,
-        BigQueryServices bqServices,
-        ValueProvider<String> executingProject) {
-      return new BigQueryTableSource(
-          jobIdToken, table, extractDestinationDir, bqServices, executingProject);
-    }
-
-    private final ValueProvider<String> jsonTable;
-    private final AtomicReference<Long> tableSizeBytes;
-
-    private BigQueryTableSource(
-        ValueProvider<String> jobIdToken,
-        ValueProvider<TableReference> table,
-        String extractDestinationDir,
-        BigQueryServices bqServices,
-        ValueProvider<String> executingProject) {
-      super(jobIdToken, extractDestinationDir, bqServices, executingProject);
-      this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
-      this.tableSizeBytes = new AtomicReference<>();
-    }
-
-    @Override
-    protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
-      checkState(jsonTable.isAccessible());
-      return JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
-    }
-
-    @Override
-    public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      checkState(jsonTable.isAccessible());
-      TableReference tableRef = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
-      return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
-    }
-
-    @Override
-    public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      if (tableSizeBytes.get() == null) {
-        TableReference table = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
-
-        Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
-            .getTable(table).getNumBytes();
-        tableSizeBytes.compareAndSet(null, numBytes);
-      }
-      return tableSizeBytes.get();
-    }
-
-    @Override
-    protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
-      // Do nothing.
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("table", jsonTable));
-    }
-  }
-
-  /**
-   * A {@link BigQuerySourceBase} for querying BigQuery tables.
-   */
-  @VisibleForTesting
-  static class BigQueryQuerySource extends BigQuerySourceBase {
-
-    static BigQueryQuerySource create(
-        ValueProvider<String> jobIdToken,
-        ValueProvider<String> query,
-        ValueProvider<TableReference> queryTempTableRef,
-        Boolean flattenResults,
-        Boolean useLegacySql,
-        String extractDestinationDir,
-        BigQueryServices bqServices) {
-      return new BigQueryQuerySource(
-          jobIdToken,
-          query,
-          queryTempTableRef,
-          flattenResults,
-          useLegacySql,
-          extractDestinationDir,
-          bqServices);
-    }
-
-    private final ValueProvider<String> query;
-    private final ValueProvider<String> jsonQueryTempTable;
-    private final Boolean flattenResults;
-    private final Boolean useLegacySql;
-    private transient AtomicReference<JobStatistics> dryRunJobStats;
-
-    private BigQueryQuerySource(
-        ValueProvider<String> jobIdToken,
-        ValueProvider<String> query,
-        ValueProvider<TableReference> queryTempTableRef,
-        Boolean flattenResults,
-        Boolean useLegacySql,
-        String extractDestinationDir,
-        BigQueryServices bqServices) {
-      super(jobIdToken, extractDestinationDir, bqServices,
-          NestedValueProvider.of(
-              checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId()));
-      this.query = checkNotNull(query, "query");
-      this.jsonQueryTempTable = NestedValueProvider.of(
-          queryTempTableRef, new TableRefToJson());
-      this.flattenResults = checkNotNull(flattenResults, "flattenResults");
-      this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
-      this.dryRunJobStats = new AtomicReference<>();
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
-    }
-
-    @Override
-    public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      return new BigQueryReader(this, bqServices.getReaderFromQuery(
-          bqOptions, executingProject.get(), createBasicQueryConfig()));
-    }
-
-    @Override
-    protected TableReference getTableToExtract(BigQueryOptions bqOptions)
-        throws IOException, InterruptedException {
-      // 1. Find the location of the query.
-      String location = null;
-      List<TableReference> referencedTables =
-          dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
-      DatasetService tableService = bqServices.getDatasetService(bqOptions);
-      if (referencedTables != null && !referencedTables.isEmpty()) {
-        TableReference queryTable = referencedTables.get(0);
-        location = tableService.getTable(queryTable).getLocation();
-      }
-
-      // 2. Create the temporary dataset in the query location.
-      TableReference tableToExtract =
-          JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
-      tableService.createDataset(
-          tableToExtract.getProjectId(),
-          tableToExtract.getDatasetId(),
-          location,
-          "Dataset for BigQuery query job temporary table");
-
-      // 3. Execute the query.
-      String queryJobId = jobIdToken.get() + "-query";
-      executeQuery(
-          executingProject.get(),
-          queryJobId,
-          tableToExtract,
-          bqServices.getJobService(bqOptions));
-      return tableToExtract;
-    }
-
-    @Override
-    protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
-      checkState(jsonQueryTempTable.isAccessible());
-      TableReference tableToRemove =
-          JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
-
-      DatasetService tableService = bqServices.getDatasetService(bqOptions);
-      tableService.deleteTable(tableToRemove);
-      tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder.add(DisplayData.item("query", query));
-    }
-
-    private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
-        throws InterruptedException, IOException {
-      if (dryRunJobStats.get() == null) {
-        JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
-            executingProject.get(), createBasicQueryConfig());
-        dryRunJobStats.compareAndSet(null, jobStats);
-      }
-      return dryRunJobStats.get();
-    }
-
-    private void executeQuery(
-        String executingProject,
-        String jobId,
-        TableReference destinationTable,
-        JobService jobService) throws IOException, InterruptedException {
-      JobReference jobRef = new JobReference()
-          .setProjectId(executingProject)
-          .setJobId(jobId);
-
-      JobConfigurationQuery queryConfig = createBasicQueryConfig()
-          .setAllowLargeResults(true)
-          .setCreateDisposition("CREATE_IF_NEEDED")
-          .setDestinationTable(destinationTable)
-          .setPriority("BATCH")
-          .setWriteDisposition("WRITE_EMPTY");
-
-      jobService.startQueryJob(jobRef, queryConfig);
-      Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
-      if (parseStatus(job) != Status.SUCCEEDED) {
-        throw new IOException(String.format(
-            "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus())));
-      }
-    }
-
-    private JobConfigurationQuery createBasicQueryConfig() {
-      return new JobConfigurationQuery()
-          .setFlattenResults(flattenResults)
-          .setQuery(query.get())
-          .setUseLegacySql(useLegacySql);
-    }
-
-    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
-      in.defaultReadObject();
-      dryRunJobStats = new AtomicReference<>();
-    }
-  }
-
-  /**
-   * An abstract {@link BoundedSource} to read a table from BigQuery.
-   *
-   * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
-   * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
-   * and {@link BigQueryQuerySource}, depending on the configuration of the read.
-   * Specifically,
-   * <ul>
-   * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
-   * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
-   * </ul>
-   * ...
-   */
-  private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
-    // The maximum number of retries to poll a BigQuery job.
-    protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
-    protected final ValueProvider<String> jobIdToken;
-    protected final String extractDestinationDir;
-    protected final BigQueryServices bqServices;
-    protected final ValueProvider<String> executingProject;
-
-    private BigQuerySourceBase(
-        ValueProvider<String> jobIdToken,
-        String extractDestinationDir,
-        BigQueryServices bqServices,
-        ValueProvider<String> executingProject) {
-      this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
-      this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
-      this.bqServices = checkNotNull(bqServices, "bqServices");
-      this.executingProject = checkNotNull(executingProject, "executingProject");
-    }
-
-    @Override
-    public List<BoundedSource<TableRow>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-      TableReference tableToExtract = getTableToExtract(bqOptions);
-      JobService jobService = bqServices.getJobService(bqOptions);
-      String extractJobId = getExtractJobId(jobIdToken);
-      List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
-
-      TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
-          .getTable(tableToExtract).getSchema();
-
-      cleanupTempResource(bqOptions);
-      return createSources(tempFiles, tableSchema);
-    }
-
-    protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
-
-    protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
-
-    @Override
-    public void validate() {
-      // Do nothing, validation is done in BigQuery.Read.
-    }
-
-    @Override
-    public Coder<TableRow> getDefaultOutputCoder() {
-      return TableRowJsonCoder.of();
-    }
-
-    private List<String> executeExtract(
-        String jobId, TableReference table, JobService jobService)
-            throws InterruptedException, IOException {
-      JobReference jobRef = new JobReference()
-          .setProjectId(executingProject.get())
-          .setJobId(jobId);
-
-      String destinationUri = getExtractDestinationUri(extractDestinationDir);
-      JobConfigurationExtract extract = new JobConfigurationExtract()
-          .setSourceTable(table)
-          .setDestinationFormat("AVRO")
-          .setDestinationUris(ImmutableList.of(destinationUri));
-
-      LOG.info("Starting BigQuery extract job: {}", jobId);
-      jobService.startExtractJob(jobRef, extract);
-      Job extractJob =
-          jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
-      if (parseStatus(extractJob) != Status.SUCCEEDED) {
-        throw new IOException(String.format(
-            "Extract job %s failed, status: %s.",
-            extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus())));
-      }
-
-      List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
-      return ImmutableList.copyOf(tempFiles);
-    }
-
-    private List<BoundedSource<TableRow>> createSources(
-        List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
-      final String jsonSchema = JSON_FACTORY.toString(tableSchema);
-
-      SerializableFunction<GenericRecord, TableRow> function =
-          new SerializableFunction<GenericRecord, TableRow>() {
-            @Override
-            public TableRow apply(GenericRecord input) {
-              return BigQueryAvroUtils.convertGenericRecordToTableRow(
-                  input, fromJsonString(jsonSchema, TableSchema.class));
-            }};
-
-      List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
-      for (String fileName : files) {
-        avroSources.add(new TransformingSource<>(
-            AvroSource.from(fileName), function, getDefaultOutputCoder()));
-      }
-      return ImmutableList.copyOf(avroSources);
-    }
-
-    protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> {
-      private final BigQuerySourceBase source;
-      private final BigQueryServices.BigQueryJsonReader reader;
-
-      private BigQueryReader(
-          BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
-        this.source = source;
-        this.reader = reader;
-      }
-
-      @Override
-      public BoundedSource<TableRow> getCurrentSource() {
-        return source;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return reader.start();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return reader.advance();
-      }
-
-      @Override
-      public TableRow getCurrent() throws NoSuchElementException {
-        return reader.getCurrent();
-      }
-
-      @Override
-      public void close() throws IOException {
-        reader.close();
-      }
-    }
-  }
-
-  /**
-   * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
-   * and transforms elements to type {@code V}.
-  */
-  @VisibleForTesting
-  static class TransformingSource<T, V> extends BoundedSource<V> {
-    private final BoundedSource<T> boundedSource;
-    private final SerializableFunction<T, V> function;
-    private final Coder<V> outputCoder;
-
-    TransformingSource(
-        BoundedSource<T> boundedSource,
-        SerializableFunction<T, V> function,
-        Coder<V> outputCoder) {
-      this.boundedSource = checkNotNull(boundedSource, "boundedSource");
-      this.function = checkNotNull(function, "function");
-      this.outputCoder = checkNotNull(outputCoder, "outputCoder");
-    }
-
-    @Override
-    public List<? extends BoundedSource<V>> splitIntoBundles(
-        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-      return Lists.transform(
-          boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
-          new Function<BoundedSource<T>, BoundedSource<V>>() {
-            @Override
-            public BoundedSource<V> apply(BoundedSource<T> input) {
-              return new TransformingSource<>(input, function, outputCoder);
-            }
-          });
-    }
-
-    @Override
-    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-      return boundedSource.getEstimatedSizeBytes(options);
-    }
-
-    @Override
-    public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
-      return new TransformingReader(boundedSource.createReader(options));
-    }
-
-    @Override
-    public void validate() {
-      boundedSource.validate();
-    }
-
-    @Override
-    public Coder<V> getDefaultOutputCoder() {
-      return outputCoder;
-    }
-
-    private class TransformingReader extends BoundedReader<V> {
-      private final BoundedReader<T> boundedReader;
-
-      private TransformingReader(BoundedReader<T> boundedReader) {
-        this.boundedReader = checkNotNull(boundedReader, "boundedReader");
-      }
-
-      @Override
-      public synchronized BoundedSource<V> getCurrentSource() {
-        return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        return boundedReader.start();
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return boundedReader.advance();
-      }
-
-      @Override
-      public V getCurrent() throws NoSuchElementException {
-        T current = boundedReader.getCurrent();
-        return function.apply(current);
-      }
-
-      @Override
-      public void close() throws IOException {
-        boundedReader.close();
-      }
-
-      @Override
-      public synchronized BoundedSource<V> splitAtFraction(double fraction) {
-        BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
-        return split == null ? null : new TransformingSource<>(split, function, outputCoder);
-      }
-
-      @Override
-      public Double getFractionConsumed() {
-        return boundedReader.getFractionConsumed();
-      }
-
-      @Override
-      public Instant getCurrentTimestamp() throws NoSuchElementException {
-        return boundedReader.getCurrentTimestamp();
-      }
-    }
-  }
-
-  private static String getExtractJobId(ValueProvider<String> jobIdToken) {
+  static String getExtractJobId(ValueProvider<String> jobIdToken) {
     return jobIdToken.get() + "-extract";
   }
 
-  private static String getExtractDestinationUri(String extractDestinationDir) {
+  static String getExtractDestinationUri(String extractDestinationDir) {
     return String.format("%s/%s", extractDestinationDir, "*.avro");
   }
 
-  private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+  static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
       throws IOException {
     JobStatistics jobStats = extractJob.getStatistics();
     List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
@@ -1538,7 +864,8 @@ public class BigQueryIO {
     }
 
     /**
-     * Writes to the given table, specified in the format described in {@link #parseTableSpec}.
+     * Writes to the given table, specified in the format described in
+     * {@link BigQueryHelpers#parseTableSpec}.
      */
     public Write<T> to(String tableSpec) {
       return to(StaticValueProvider.of(tableSpec));
@@ -1546,7 +873,7 @@ public class BigQueryIO {
 
     /** Writes to the given table, specified as a {@link TableReference}. */
     public Write<T> to(TableReference table) {
-      return to(StaticValueProvider.of(toTableSpec(table)));
+      return to(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
     }
 
     /** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
@@ -1596,7 +923,7 @@ public class BigQueryIO {
 
       @Override
       public TableReference apply(ValueInSingleWindow<T> value) {
-        return parseTableSpec(tableSpecFunction.apply(value));
+        return BigQueryHelpers.parseTableSpec(tableSpecFunction.apply(value));
       }
     }
 
@@ -1609,7 +936,7 @@ public class BigQueryIO {
      */
     public Write<T> withSchema(TableSchema schema) {
       return toBuilder()
-          .setJsonSchema(StaticValueProvider.of(toJsonString(schema)))
+          .setJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema)))
           .build();
     }
 
@@ -1655,7 +982,7 @@ public class BigQueryIO {
           checkState(
               datasetService.isTableEmpty(tableRef),
               "BigQuery table is not empty: %s.",
-              BigQueryIO.toTableSpec(tableRef));
+              BigQueryHelpers.toTableSpec(tableRef));
         }
       } catch (IOException | InterruptedException e) {
         if (e instanceof InterruptedException) {
@@ -1663,7 +990,7 @@ public class BigQueryIO {
         }
         throw new RuntimeException(
             "unable to confirm BigQuery table emptiness for table "
-                + BigQueryIO.toTableSpec(tableRef), e);
+                + BigQueryHelpers.toTableSpec(tableRef), e);
       }
     }
 
@@ -1758,7 +1085,7 @@ public class BigQueryIO {
 
       ValueProvider<TableReference> table = getTableWithDefaultProject(options);
 
-      final String stepUuid = randomUUIDString();
+      final String stepUuid = BigQueryHelpers.randomUUIDString();
 
       String tempLocation = options.getTempLocation();
       String tempFilePrefix;
@@ -1953,7 +1280,7 @@ public class BigQueryIO {
 
     /** Returns the table schema. */
     public TableSchema getSchema() {
-      return fromJsonString(
+      return BigQueryHelpers.fromJsonString(
           getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
     }
 
@@ -1979,7 +1306,7 @@ public class BigQueryIO {
         TableReference tableRef = table.get();
         tableRef.setProjectId(bqOptions.getProject());
         return NestedValueProvider.of(StaticValueProvider.of(
-            toJsonString(tableRef)), new JsonTableRefToTableRef());
+            BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
       }
       return table;
     }
@@ -2128,7 +1455,8 @@ public class BigQueryIO {
         List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
         String jobIdPrefix = String.format(
             c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
-        TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class);
+        TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
+            TableReference.class);
         if (!singlePartition) {
           ref.setTableId(jobIdPrefix);
         }
@@ -2138,13 +1466,13 @@ public class BigQueryIO {
             bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
             jobIdPrefix,
             ref,
-            fromJsonString(
+            BigQueryHelpers.fromJsonString(
                 jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
             partition,
             writeDisposition,
             createDisposition,
             tableDescription);
-        c.output(toJsonString(ref));
+        c.output(BigQueryHelpers.toJsonString(ref));
 
         removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
       }
@@ -2176,7 +1504,7 @@ public class BigQueryIO {
               .setJobId(jobId);
           jobService.startLoadJob(jobRef, loadConfig);
           Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
-          Status jobStatus = parseStatus(loadJob);
+          Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
           switch (jobStatus) {
             case SUCCEEDED:
               if (tableDescription != null) {
@@ -2185,14 +1513,15 @@ public class BigQueryIO {
               return;
             case UNKNOWN:
               throw new RuntimeException(String.format(
-                  "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob)));
+                  "UNKNOWN status of load job [%s]: %s.", jobId,
+                  BigQueryHelpers.jobToPrettyString(loadJob)));
             case FAILED:
               lastFailedLoadJob = loadJob;
               continue;
             default:
               throw new IllegalStateException(String.format(
                   "Unexpected status [%s] of load job: %s.",
-                  jobStatus, jobToPrettyString(loadJob)));
+                  jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
           }
         }
         throw new RuntimeException(String.format(
@@ -2200,7 +1529,7 @@ public class BigQueryIO {
                 + "reached max retries: %d, last failed load job: %s.",
             jobIdPrefix,
             Write.MAX_RETRY_JOBS,
-            jobToPrettyString(lastFailedLoadJob)));
+            BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
       }
 
       static void removeTemporaryFiles(
@@ -2281,13 +1610,13 @@ public class BigQueryIO {
 
         List<TableReference> tempTables = Lists.newArrayList();
         for (String table : tempTablesJson) {
-          tempTables.add(fromJsonString(table, TableReference.class));
+          tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
         }
         copy(
             bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
             bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
             c.sideInput(jobIdToken),
-            fromJsonString(jsonTableRef.get(), TableReference.class),
+            BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class),
             tempTables,
             writeDisposition,
             createDisposition,
@@ -2322,7 +1651,7 @@ public class BigQueryIO {
               .setJobId(jobId);
           jobService.startCopyJob(jobRef, copyConfig);
           Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
-          Status jobStatus = parseStatus(copyJob);
+          Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
           switch (jobStatus) {
             case SUCCEEDED:
               if (tableDescription != null) {
@@ -2331,14 +1660,15 @@ public class BigQueryIO {
               return;
             case UNKNOWN:
               throw new RuntimeException(String.format(
-                  "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob)));
+                  "UNKNOWN status of copy job [%s]: %s.", jobId,
+                  BigQueryHelpers.jobToPrettyString(copyJob)));
             case FAILED:
               lastFailedCopyJob = copyJob;
               continue;
             default:
               throw new IllegalStateException(String.format(
                   "Unexpected status [%s] of load job: %s.",
-                  jobStatus, jobToPrettyString(copyJob)));
+                  jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
           }
         }
         throw new RuntimeException(String.format(
@@ -2346,17 +1676,17 @@ public class BigQueryIO {
                 + "reached max retries: %d, last failed copy job: %s.",
             jobIdPrefix,
             Write.MAX_RETRY_JOBS,
-            jobToPrettyString(lastFailedCopyJob)));
+            BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
       }
 
       static void removeTemporaryTables(DatasetService tableService,
           List<TableReference> tempTables) {
         for (TableReference tableRef : tempTables) {
           try {
-            LOG.debug("Deleting table {}", toJsonString(tableRef));
+            LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef));
             tableService.deleteTable(tableRef);
           } catch (Exception e) {
-            LOG.warn("Failed to delete the table {}", toJsonString(tableRef), e);
+            LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e);
           }
         }
       }
@@ -2376,14 +1706,6 @@ public class BigQueryIO {
     }
   }
 
-  private static String jobToPrettyString(@Nullable Job job) throws IOException {
-    return job == null ? "null" : job.toPrettyString();
-  }
-
-  private static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
-    return status == null ? "Unknown status: null." : status.toPrettyString();
-  }
-
   private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
     try {
       datasetService.getDataset(table.getProjectId(), table.getDatasetId());
@@ -2391,14 +1713,14 @@ public class BigQueryIO {
       ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
         throw new IllegalArgumentException(
-            String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)),
+            String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryHelpers.toTableSpec(table)),
             e);
       } else if (e instanceof  RuntimeException) {
         throw (RuntimeException) e;
       } else {
         throw new RuntimeException(
             String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset",
-                BigQueryIO.toTableSpec(table)),
+                BigQueryHelpers.toTableSpec(table)),
             e);
       }
     }
@@ -2411,13 +1733,14 @@ public class BigQueryIO {
       ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
       if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
         throw new IllegalArgumentException(
-            String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e);
+            String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryHelpers.toTableSpec(table)),
+            e);
       } else if (e instanceof  RuntimeException) {
         throw (RuntimeException) e;
       } else {
         throw new RuntimeException(
             String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
-                BigQueryIO.toTableSpec(table)),
+                BigQueryHelpers.toTableSpec(table)),
             e);
       }
     }
@@ -2492,8 +1815,9 @@ public class BigQueryIO {
     @ProcessElement
     public void processElement(ProcessContext context) {
       String tableSpec = context.element().getKey().getKey();
-      List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
-      List<String> uniqueIds = getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec);
+      List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
+      List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
+          tableSpec);
 
       rows.add(context.element().getValue().tableRow);
       uniqueIds.add(context.element().getValue().uniqueId);
@@ -2526,7 +1850,7 @@ public class BigQueryIO {
 
     public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
         throws InterruptedException, IOException {
-      TableReference tableReference = parseTableSpec(tableSpec);
+      TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
       if (createDisposition != createDisposition.CREATE_NEVER
           && !createdTables.contains(tableSpec)) {
         synchronized (createdTables) {
@@ -2723,7 +2047,7 @@ public class BigQueryIO {
           TableReference tableRef = table.get()
               .setProjectId(options.as(BigQueryOptions.class).getProject());
           table = NestedValueProvider.of(
-              StaticValueProvider.of(toJsonString(tableRef)),
+              StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
               new JsonTableRefToTableRef());
         }
         this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
@@ -2779,7 +2103,7 @@ public class BigQueryIO {
         if (table.getProjectId() == null) {
           table.setProjectId(options.getProject());
         }
-        return toTableSpec(table);
+        return BigQueryHelpers.toTableSpec(table);
       }
     }
   }
@@ -2858,68 +2182,9 @@ public class BigQueryIO {
     UNKNOWN,
   }
 
-  private static Status parseStatus(@Nullable Job job) {
-    if (job == null) {
-      return Status.UNKNOWN;
-    }
-    JobStatus status = job.getStatus();
-    if (status.getErrorResult() != null) {
-      return Status.FAILED;
-    } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
-      return Status.FAILED;
-    } else {
-      return Status.SUCCEEDED;
-    }
-  }
-
-  @VisibleForTesting
-  static String toJsonString(Object item) {
-    if (item == null) {
-      return null;
-    }
-    try {
-      return JSON_FACTORY.toString(item);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()),
-          e);
-    }
-  }
-
-  @VisibleForTesting
-  static <T> T fromJsonString(String json, Class<T> clazz) {
-    if (json == null) {
-      return null;
-    }
-    try {
-      return JSON_FACTORY.fromString(json, clazz);
-    } catch (IOException e) {
-      throw new RuntimeException(
-          String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json),
-          e);
-    }
-  }
-
-  /**
-   * Returns a randomUUID string.
-   *
-   * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
-   */
-  private static String randomUUIDString() {
-    return UUID.randomUUID().toString().replaceAll("-", "");
-  }
-
   /////////////////////////////////////////////////////////////////////////////
 
   /** Disallow construction of utility class. */
   private BigQueryIO() {}
 
-  private static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
-    List<V> value = map.get(key);
-    if (value == null) {
-      value = new ArrayList<>();
-      map.put(key, value);
-    }
-    return value;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
new file mode 100644
index 0000000..a909957
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -0,0 +1,186 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+
+/**
+ * A {@link BigQuerySourceBase} for querying BigQuery tables.
+ */
+@VisibleForTesting
+class BigQueryQuerySource extends BigQuerySourceBase {
+
+  static BigQueryQuerySource create(
+      ValueProvider<String> jobIdToken,
+      ValueProvider<String> query,
+      ValueProvider<TableReference> queryTempTableRef,
+      Boolean flattenResults,
+      Boolean useLegacySql,
+      String extractDestinationDir,
+      BigQueryServices bqServices) {
+    return new BigQueryQuerySource(
+        jobIdToken,
+        query,
+        queryTempTableRef,
+        flattenResults,
+        useLegacySql,
+        extractDestinationDir,
+        bqServices);
+  }
+
+  private final ValueProvider<String> query;
+  private final ValueProvider<String> jsonQueryTempTable;
+  private final Boolean flattenResults;
+  private final Boolean useLegacySql;
+  private transient AtomicReference<JobStatistics> dryRunJobStats;
+
+  private BigQueryQuerySource(
+      ValueProvider<String> jobIdToken,
+      ValueProvider<String> query,
+      ValueProvider<TableReference> queryTempTableRef,
+      Boolean flattenResults,
+      Boolean useLegacySql,
+      String extractDestinationDir,
+      BigQueryServices bqServices) {
+    super(jobIdToken, extractDestinationDir, bqServices,
+        NestedValueProvider.of(
+            checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId()));
+    this.query = checkNotNull(query, "query");
+    this.jsonQueryTempTable = NestedValueProvider.of(
+        queryTempTableRef, new TableRefToJson());
+    this.flattenResults = checkNotNull(flattenResults, "flattenResults");
+    this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
+    this.dryRunJobStats = new AtomicReference<>();
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
+  }
+
+  @Override
+  public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    return new BigQueryReader(this, bqServices.getReaderFromQuery(
+        bqOptions, executingProject.get(), createBasicQueryConfig()));
+  }
+
+  @Override
+  protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+      throws IOException, InterruptedException {
+    // 1. Find the location of the query.
+    String location = null;
+    List<TableReference> referencedTables =
+        dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+    DatasetService tableService = bqServices.getDatasetService(bqOptions);
+    if (referencedTables != null && !referencedTables.isEmpty()) {
+      TableReference queryTable = referencedTables.get(0);
+      location = tableService.getTable(queryTable).getLocation();
+    }
+
+    // 2. Create the temporary dataset in the query location.
+    TableReference tableToExtract =
+        BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+    tableService.createDataset(
+        tableToExtract.getProjectId(),
+        tableToExtract.getDatasetId(),
+        location,
+        "Dataset for BigQuery query job temporary table");
+
+    // 3. Execute the query.
+    String queryJobId = jobIdToken.get() + "-query";
+    executeQuery(
+        executingProject.get(),
+        queryJobId,
+        tableToExtract,
+        bqServices.getJobService(bqOptions));
+    return tableToExtract;
+  }
+
+  @Override
+  protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+    checkState(jsonQueryTempTable.isAccessible());
+    TableReference tableToRemove =
+        BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+
+    DatasetService tableService = bqServices.getDatasetService(bqOptions);
+    tableService.deleteTable(tableToRemove);
+    tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder.add(DisplayData.item("query", query));
+  }
+
+  private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
+      throws InterruptedException, IOException {
+    if (dryRunJobStats.get() == null) {
+      JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
+          executingProject.get(), createBasicQueryConfig());
+      dryRunJobStats.compareAndSet(null, jobStats);
+    }
+    return dryRunJobStats.get();
+  }
+
+  private void executeQuery(
+      String executingProject,
+      String jobId,
+      TableReference destinationTable,
+      JobService jobService) throws IOException, InterruptedException {
+    JobReference jobRef = new JobReference()
+        .setProjectId(executingProject)
+        .setJobId(jobId);
+
+    JobConfigurationQuery queryConfig = createBasicQueryConfig()
+        .setAllowLargeResults(true)
+        .setCreateDisposition("CREATE_IF_NEEDED")
+        .setDestinationTable(destinationTable)
+        .setPriority("BATCH")
+        .setWriteDisposition("WRITE_EMPTY");
+
+    jobService.startQueryJob(jobRef, queryConfig);
+    Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+    if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+      throw new IOException(String.format(
+          "Query job %s failed, status: %s.", jobId,
+          BigQueryHelpers.statusToPrettyString(job.getStatus())));
+    }
+  }
+
+  private JobConfigurationQuery createBasicQueryConfig() {
+    return new JobConfigurationQuery()
+        .setFlattenResults(flattenResults)
+        .setQuery(query.get())
+        .setUseLegacySql(useLegacySql);
+  }
+
+  private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+    in.defaultReadObject();
+    dryRunJobStats = new AtomicReference<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 15ca262..c8e6ed8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -444,7 +444,7 @@ class BigQueryServicesImpl implements BigQueryServices {
     @Override
     public void createTable(Table table) throws InterruptedException, IOException {
       LOG.info("Trying to create BigQuery table: {}",
-          BigQueryIO.toTableSpec(table.getTableReference()));
+          BigQueryHelpers.toTableSpec(table.getTableReference()));
       BackOff backoff =
               new ExponentialBackOff.Builder()
                       .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
new file mode 100644
index 0000000..ff50e6d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -0,0 +1,178 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.AvroSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract {@link BoundedSource} to read a table from BigQuery.
+ *
+ * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
+ * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
+ * and {@link BigQueryQuerySource}, depending on the configuration of the read.
+ * Specifically,
+ * <ul>
+ * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
+ * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
+ * </ul>
+ * ...
+ */
+abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class);
+
+  // The maximum number of retries to poll a BigQuery job.
+  protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+  protected final ValueProvider<String> jobIdToken;
+  protected final String extractDestinationDir;
+  protected final BigQueryServices bqServices;
+  protected final ValueProvider<String> executingProject;
+
+  BigQuerySourceBase(
+      ValueProvider<String> jobIdToken,
+      String extractDestinationDir,
+      BigQueryServices bqServices,
+      ValueProvider<String> executingProject) {
+    this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+    this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
+    this.bqServices = checkNotNull(bqServices, "bqServices");
+    this.executingProject = checkNotNull(executingProject, "executingProject");
+  }
+
+  @Override
+  public List<BoundedSource<TableRow>> splitIntoBundles(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    TableReference tableToExtract = getTableToExtract(bqOptions);
+    JobService jobService = bqServices.getJobService(bqOptions);
+    String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
+    List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+    TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
+        .getTable(tableToExtract).getSchema();
+
+    cleanupTempResource(bqOptions);
+    return createSources(tempFiles, tableSchema);
+  }
+
+  protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
+
+  protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
+
+  @Override
+  public void validate() {
+    // Do nothing, validation is done in BigQuery.Read.
+  }
+
+  @Override
+  public Coder<TableRow> getDefaultOutputCoder() {
+    return TableRowJsonCoder.of();
+  }
+
+  private List<String> executeExtract(
+      String jobId, TableReference table, JobService jobService)
+          throws InterruptedException, IOException {
+    JobReference jobRef = new JobReference()
+        .setProjectId(executingProject.get())
+        .setJobId(jobId);
+
+    String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);
+    JobConfigurationExtract extract = new JobConfigurationExtract()
+        .setSourceTable(table)
+        .setDestinationFormat("AVRO")
+        .setDestinationUris(ImmutableList.of(destinationUri));
+
+    LOG.info("Starting BigQuery extract job: {}", jobId);
+    jobService.startExtractJob(jobRef, extract);
+    Job extractJob =
+        jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+    if (BigQueryHelpers.parseStatus(extractJob) != Status.SUCCEEDED) {
+      throw new IOException(String.format(
+          "Extract job %s failed, status: %s.",
+          extractJob.getJobReference().getJobId(),
+          BigQueryHelpers.statusToPrettyString(extractJob.getStatus())));
+    }
+
+    List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
+    return ImmutableList.copyOf(tempFiles);
+  }
+
+  private List<BoundedSource<TableRow>> createSources(
+      List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+    final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);
+
+    SerializableFunction<GenericRecord, TableRow> function =
+        new SerializableFunction<GenericRecord, TableRow>() {
+          @Override
+          public TableRow apply(GenericRecord input) {
+            return BigQueryAvroUtils.convertGenericRecordToTableRow(
+                input, BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class));
+          }};
+
+    List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
+    for (String fileName : files) {
+      avroSources.add(new TransformingSource<>(
+          AvroSource.from(fileName), function, getDefaultOutputCoder()));
+    }
+    return ImmutableList.copyOf(avroSources);
+  }
+
+  protected static class BigQueryReader extends BoundedReader<TableRow> {
+    private final BigQuerySourceBase source;
+    private final BigQueryServices.BigQueryJsonReader reader;
+
+    BigQueryReader(
+        BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
+      this.source = source;
+      this.reader = reader;
+    }
+
+    @Override
+    public BoundedSource<TableRow> getCurrentSource() {
+      return source;
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return reader.start();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      return reader.advance();
+    }
+
+    @Override
+    public TableRow getCurrent() throws NoSuchElementException {
+      return reader.getCurrent();
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
new file mode 100644
index 0000000..aae0faa
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -0,0 +1,86 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/**
+ * A {@link BigQuerySourceBase} for reading BigQuery tables.
+ */
+@VisibleForTesting
+class BigQueryTableSource extends BigQuerySourceBase {
+
+  static BigQueryTableSource create(
+      ValueProvider<String> jobIdToken,
+      ValueProvider<TableReference> table,
+      String extractDestinationDir,
+      BigQueryServices bqServices,
+      ValueProvider<String> executingProject) {
+    return new BigQueryTableSource(
+        jobIdToken, table, extractDestinationDir, bqServices, executingProject);
+  }
+
+  private final ValueProvider<String> jsonTable;
+  private final AtomicReference<Long> tableSizeBytes;
+
+  private BigQueryTableSource(
+      ValueProvider<String> jobIdToken,
+      ValueProvider<TableReference> table,
+      String extractDestinationDir,
+      BigQueryServices bqServices,
+      ValueProvider<String> executingProject) {
+    super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+    this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
+    this.tableSizeBytes = new AtomicReference<>();
+  }
+
+  @Override
+  protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
+    checkState(jsonTable.isAccessible());
+    return BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
+  }
+
+  @Override
+  public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+    BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+    checkState(jsonTable.isAccessible());
+    TableReference tableRef = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
+        TableReference.class);
+    return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
+  }
+
+  @Override
+  public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    if (tableSizeBytes.get() == null) {
+      TableReference table = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
+          TableReference.class);
+
+      Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+          .getTable(table).getNumBytes();
+      tableSizeBytes.compareAndSet(null, numBytes);
+    }
+    return tableSizeBytes.get();
+  }
+
+  @Override
+  protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+    // Do nothing.
+  }
+
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder.add(DisplayData.item("table", jsonTable));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
new file mode 100644
index 0000000..612afbe
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -0,0 +1,66 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
+ * has been processed.
+ */
+@VisibleForTesting
+class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
+  private CleanupOperation cleanupOperation;
+
+  PassThroughThenCleanup(CleanupOperation cleanupOperation) {
+    this.cleanupOperation = cleanupOperation;
+  }
+
+  @Override
+  public PCollection<T> expand(PCollection<T> input) {
+    TupleTag<T> mainOutput = new TupleTag<>();
+    TupleTag<Void> cleanupSignal = new TupleTag<>();
+    PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
+        .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
+
+    PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
+        .setCoder(VoidCoder.of())
+        .apply(View.<Void>asSingleton().withDefaultValue(null));
+
+    input.getPipeline()
+        .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
+        .apply("Cleanup", ParDo.of(
+            new DoFn<CleanupOperation, Void>() {
+              @ProcessElement
+              public void processElement(ProcessContext c)
+                  throws Exception {
+                c.element().cleanup(c.getPipelineOptions());
+              }
+            }).withSideInputs(cleanupSignalView));
+
+    return outputs.get(mainOutput);
+  }
+
+  private static class IdentityFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      c.output(c.element());
+    }
+  }
+
+  abstract static class CleanupOperation implements Serializable {
+    abstract void cleanup(PipelineOptions options) throws Exception;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
new file mode 100644
index 0000000..a86adfb
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
@@ -0,0 +1,118 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Instant;
+
+/**
+ * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
+ * and transforms elements to type {@code V}.
+*/
+@VisibleForTesting
+class TransformingSource<T, V> extends BoundedSource<V> {
+  private final BoundedSource<T> boundedSource;
+  private final SerializableFunction<T, V> function;
+  private final Coder<V> outputCoder;
+
+  TransformingSource(
+      BoundedSource<T> boundedSource,
+      SerializableFunction<T, V> function,
+      Coder<V> outputCoder) {
+    this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+    this.function = checkNotNull(function, "function");
+    this.outputCoder = checkNotNull(outputCoder, "outputCoder");
+  }
+
+  @Override
+  public List<? extends BoundedSource<V>> splitIntoBundles(
+      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+    return Lists.transform(
+        boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+        new Function<BoundedSource<T>, BoundedSource<V>>() {
+          @Override
+          public BoundedSource<V> apply(BoundedSource<T> input) {
+            return new TransformingSource<>(input, function, outputCoder);
+          }
+        });
+  }
+
+  @Override
+  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+    return boundedSource.getEstimatedSizeBytes(options);
+  }
+
+  @Override
+  public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
+    return new TransformingReader(boundedSource.createReader(options));
+  }
+
+  @Override
+  public void validate() {
+    boundedSource.validate();
+  }
+
+  @Override
+  public Coder<V> getDefaultOutputCoder() {
+    return outputCoder;
+  }
+
+  private class TransformingReader extends BoundedReader<V> {
+    private final BoundedReader<T> boundedReader;
+
+    private TransformingReader(BoundedReader<T> boundedReader) {
+      this.boundedReader = checkNotNull(boundedReader, "boundedReader");
+    }
+
+    @Override
+    public synchronized BoundedSource<V> getCurrentSource() {
+      return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      return boundedReader.start();
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      return boundedReader.advance();
+    }
+
+    @Override
+    public V getCurrent() throws NoSuchElementException {
+      T current = boundedReader.getCurrent();
+      return function.apply(current);
+    }
+
+    @Override
+    public void close() throws IOException {
+      boundedReader.close();
+    }
+
+    @Override
+    public synchronized BoundedSource<V> splitAtFraction(double fraction) {
+      BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
+      return split == null ? null : new TransformingSource<>(split, function, outputCoder);
+    }
+
+    @Override
+    public Double getFractionConsumed() {
+      return boundedReader.getFractionConsumed();
+    }
+
+    @Override
+    public Instant getCurrentTimestamp() throws NoSuchElementException {
+      return boundedReader.getCurrentTimestamp();
+    }
+  }
+}