You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/13 17:48:44 UTC

[3/3] incubator-beam git commit: [BEAM-48] Implement BigQueryIO.Read as Source

[BEAM-48] Implement BigQueryIO.Read as Source


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/987350b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/987350b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/987350b7

Branch: refs/heads/master
Commit: 987350b7469f7dd04d9bc4c2145516b579297f61
Parents: 93a5d39
Author: Pei He <pe...@google.com>
Authored: Wed Apr 13 12:03:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 13 10:48:27 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/BigQueryIO.java | 877 ++++++++++++++++---
 .../apache/beam/sdk/util/BigQueryServices.java  | 109 ++-
 .../beam/sdk/util/BigQueryServicesImpl.java     | 347 ++++++--
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  | 550 ++++++++++--
 .../beam/sdk/util/BigQueryServicesImplTest.java |  40 +-
 5 files changed, 1657 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 15872e5..a5ef39f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -36,21 +36,26 @@ import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
 import org.apache.beam.sdk.options.BigQueryOptions;
 import org.apache.beam.sdk.options.GcpOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.AvroUtils;
 import org.apache.beam.sdk.util.BigQueryServices;
+import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
 import org.apache.beam.sdk.util.BigQueryServices.JobService;
 import org.apache.beam.sdk.util.BigQueryServicesImpl;
 import org.apache.beam.sdk.util.BigQueryTableInserter;
 import org.apache.beam.sdk.util.BigQueryTableRowIterator;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
 import org.apache.beam.sdk.util.IOChannelFactory;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.MimeTypes;
@@ -58,18 +63,27 @@ import org.apache.beam.sdk.util.PropertyNames;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 
 import com.google.api.client.json.JsonFactory;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
 import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.QueryRequest;
 import com.google.api.services.bigquery.model.TableReference;
@@ -77,31 +91,42 @@ import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.ObjectInputStream;
 import java.io.OutputStream;
+import java.io.Serializable;
 import java.nio.channels.Channels;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -362,27 +387,39 @@ public class BigQueryIO {
      * {@link PCollection} of {@link TableRow TableRows}.
      */
     public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
-      TableReference table;
-      final String query;
+      @Nullable final String jsonTableRef;
+      @Nullable final String query;
       final boolean validate;
-      @Nullable
-      Boolean flattenResults;
+      @Nullable final Boolean flattenResults;
+      @Nullable final BigQueryServices testBigQueryServices;
 
       private static final String QUERY_VALIDATION_FAILURE_ERROR =
           "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
           + " pipeline, This validation can be disabled using #withoutValidation.";
 
+      // The maximum number of retries to poll a BigQuery job in the cleanup phase.
+      // We expect the jobs have already DONE, and don't need a high max retires.
+      private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10;
+
       private Bound() {
-        this(null, null, null, true, null);
+        this(
+            null /* name */,
+            null /* query */,
+            null /* jsonTableRef */,
+            true /* validate */,
+            null /* flattenResults */,
+            null /* testBigQueryServices */);
       }
 
-      private Bound(String name, String query, TableReference reference, boolean validate,
-          Boolean flattenResults) {
+      private Bound(
+          String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate,
+          @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) {
         super(name);
-        this.table = reference;
+        this.jsonTableRef = jsonTableRef;
         this.query = query;
         this.validate = validate;
         this.flattenResults = flattenResults;
+        this.testBigQueryServices = testBigQueryServices;
       }
 
       /**
@@ -391,7 +428,7 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound named(String name) {
-        return new Bound(name, query, table, validate, flattenResults);
+        return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices);
       }
 
       /**
@@ -410,7 +447,8 @@ public class BigQueryIO {
        * <p>Does not modify this object.
        */
       public Bound from(TableReference table) {
-        return new Bound(name, query, table, validate, flattenResults);
+        return new Bound(
+            name, query, toJsonString(table), validate, flattenResults, testBigQueryServices);
       }
 
       /**
@@ -424,15 +462,15 @@ public class BigQueryIO {
        * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
        */
       public Bound fromQuery(String query) {
-        return new Bound(name, query, table, validate,
-            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
+        return new Bound(name, query, jsonTableRef, validate,
+            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices);
       }
 
       /**
        * Disable table validation.
        */
       public Bound withoutValidation() {
-        return new Bound(name, query, table, false, flattenResults);
+        return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices);
       }
 
       /**
@@ -443,35 +481,34 @@ public class BigQueryIO {
        * from a table will cause an error during validation.
        */
       public Bound withoutResultFlattening() {
-        return new Bound(name, query, table, validate, false);
+        return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices);
+      }
+
+      @VisibleForTesting
+      Bound withTestServices(BigQueryServices testServices) {
+        return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices);
       }
 
       @Override
       public void validate(PInput input) {
-        if (table == null && query == null) {
-          throw new IllegalStateException(
-              "Invalid BigQuery read operation, either table reference or query has to be set");
-        } else if (table != null && query != null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
-              + " query and a table, only one of these should be provided");
-        } else if (table != null && flattenResults != null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-              + " table with a result flattening preference, which is not configurable");
-        } else if (query != null && flattenResults == null) {
-          throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
-              + " query without a result flattening preference");
-        }
-
-        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
-        if (table != null && table.getProjectId() == null) {
-          // If user does not specify a project we assume the table to be located in the project
-          // that owns the Dataflow job.
-          LOG.warn(String.format(SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
-              table.getTableId(), bqOptions.getProject()));
-          table.setProjectId(bqOptions.getProject());
-        }
-
         if (validate) {
+          BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+          TableReference table = getTableWithDefaultProject(bqOptions);
+          if (table == null && query == null) {
+            throw new IllegalStateException(
+                "Invalid BigQuery read operation, either table reference or query has to be set");
+          } else if (table != null && query != null) {
+            throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
+                + " query and a table, only one of these should be provided");
+          } else if (table != null && flattenResults != null) {
+            throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+                + " table with a result flattening preference, which is not configurable");
+          } else if (query != null && flattenResults == null) {
+            throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+                + " query without a result flattening preference");
+          }
+
           // Check for source table/query presence for early failure notification.
           // Note that a presence check can fail if the table or dataset are created by earlier
           // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these
@@ -504,14 +541,83 @@ public class BigQueryIO {
 
       @Override
       public PCollection<TableRow> apply(PInput input) {
-        return PCollection.<TableRow>createPrimitiveOutputInternal(
-            input.getPipeline(),
-            WindowingStrategy.globalDefault(),
-            IsBounded.BOUNDED)
-            // Force the output's Coder to be what the read is using, and
-            // unchangeable later, to ensure that we read the input in the
-            // format specified by the Read transform.
-            .setCoder(TableRowJsonCoder.of());
+        String uuid = randomUUIDString();
+        final String jobIdToken = "beam_job_" + uuid;
+
+        BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+        BoundedSource<TableRow> source;
+        final BigQueryServices bqServices = getBigQueryServices();
+
+        final String extractDestinationDir;
+        String tempLocation = bqOptions.getTempLocation();
+        try {
+          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+          extractDestinationDir = factory.resolve(tempLocation, uuid);
+        } catch (IOException e) {
+          throw new RuntimeException(
+              String.format("Failed to resolve extract destination directory in %s", tempLocation));
+        }
+
+        if (!Strings.isNullOrEmpty(query)) {
+          String projectId = bqOptions.getProject();
+          String queryTempDatasetId = "temp_dataset_" + uuid;
+          String queryTempTableId = "temp_table_" + uuid;
+
+          TableReference queryTempTableRef = new TableReference()
+              .setProjectId(projectId)
+              .setDatasetId(queryTempDatasetId)
+              .setTableId(queryTempTableId);
+
+          String jsonQueryTempTable;
+          try {
+            jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef);
+          } catch (IOException e) {
+            throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+          }
+          source = BigQueryQuerySource.create(
+              jobIdToken, query, jsonQueryTempTable, flattenResults,
+              extractDestinationDir, bqServices);
+        } else {
+          String jsonTable;
+          try {
+            jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions));
+          } catch (IOException e) {
+            throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+          }
+          source = BigQueryTableSource.create(
+              jobIdToken, jsonTable, extractDestinationDir, bqServices);
+        }
+        PassThroughThenCleanup.CleanupOperation cleanupOperation =
+            new PassThroughThenCleanup.CleanupOperation() {
+              @Override
+              void cleanup(PipelineOptions options) throws Exception {
+                BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+
+                JobReference jobRef = new JobReference()
+                    .setProjectId(bqOptions.getProject())
+                    .setJobId(getExtractJobId(jobIdToken));
+                Job extractJob = bqServices.getJobService(bqOptions).pollJob(
+                    jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
+
+                Collection<String> extractFiles = null;
+                if (extractJob != null) {
+                  extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+                } else {
+                  IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+                  Collection<String> dirMatch = factory.match(extractDestinationDir);
+                  if (!dirMatch.isEmpty()) {
+                    extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+                  }
+                }
+                if (extractFiles != null && !extractFiles.isEmpty()) {
+                  new GcsUtilFactory().create(options).remove(extractFiles);
+                }
+              }};
+        return input.getPipeline()
+            .apply(org.apache.beam.sdk.io.Read.from(source))
+            .setCoder(getDefaultOutputCoder())
+            .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
       }
 
       @Override
@@ -522,6 +628,7 @@ public class BigQueryIO {
       @Override
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
+        TableReference table = getTable();
 
         if (table != null) {
           builder.add(DisplayData.item("table", toTableSpec(table)));
@@ -533,22 +640,26 @@ public class BigQueryIO {
             .addIfNotDefault(DisplayData.item("validation", validate), true);
       }
 
-      static {
-        DirectPipelineRunner.registerDefaultTransformEvaluator(
-            Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
-              @Override
-              public void evaluate(
-                  Bound transform, DirectPipelineRunner.EvaluationContext context) {
-                evaluateReadHelper(transform, context);
-              }
-            });
+      /**
+       * Returns the table to write, or {@code null} if reading from a query instead.
+       *
+       * <p>If the table's project is not specified, use the default one.
+       */
+      @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
+        TableReference table = getTable();
+        if (table != null && table.getProjectId() == null) {
+          // If user does not specify a project we assume the table to be located in
+          // the default project.
+          table.setProjectId(bqOptions.getProject());
+        }
+        return table;
       }
 
       /**
        * Returns the table to read, or {@code null} if reading from a query instead.
        */
       public TableReference getTable() {
-        return table;
+        return fromJsonString(jsonTableRef, TableReference.class);
       }
 
       /**
@@ -571,12 +682,567 @@ public class BigQueryIO {
       public Boolean getFlattenResults() {
         return flattenResults;
       }
+
+      private BigQueryServices getBigQueryServices() {
+        if (testBigQueryServices != null) {
+          return testBigQueryServices;
+        } else {
+          return new BigQueryServicesImpl();
+        }
+      }
     }
 
     /** Disallow construction of utility class. */
     private Read() {}
   }
 
+  /**
+   * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
+   * has been processed.
+   */
+  @VisibleForTesting
+  static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
+    private CleanupOperation cleanupOperation;
+
+    PassThroughThenCleanup(CleanupOperation cleanupOperation) {
+      this.cleanupOperation = cleanupOperation;
+    }
+
+    @Override
+    public PCollection<T> apply(PCollection<T> input) {
+      TupleTag<T> mainOutput = new TupleTag<>();
+      TupleTag<Void> cleanupSignal = new TupleTag<>();
+      PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
+          .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
+
+      PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
+          .setCoder(VoidCoder.of())
+          .apply(View.<Void>asSingleton().withDefaultValue(null));
+
+      input.getPipeline()
+          .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
+          .apply("Cleanup", ParDo.of(
+              new DoFn<CleanupOperation, Void>() {
+                @Override
+                public void processElement(ProcessContext c)
+                    throws Exception {
+                  c.element().cleanup(c.getPipelineOptions());
+                }
+              }).withSideInputs(cleanupSignalView));
+
+      return outputs.get(mainOutput);
+    }
+
+    private static class IdentityFn<T> extends DoFn<T, T> {
+      @Override
+      public void processElement(ProcessContext c) {
+        c.output(c.element());
+      }
+    }
+
+    abstract static class CleanupOperation implements Serializable {
+      abstract void cleanup(PipelineOptions options) throws Exception;
+    }
+  }
+
+  /**
+   * A {@link BigQuerySourceBase} for reading BigQuery tables.
+   */
+  @VisibleForTesting
+  static class BigQueryTableSource extends BigQuerySourceBase {
+
+    static BigQueryTableSource create(
+        String jobIdToken,
+        String jsonTable,
+        String extractDestinationDir,
+        BigQueryServices bqServices) {
+      return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices);
+    }
+
+    private final String jsonTable;
+    private final AtomicReference<Long> tableSizeBytes;
+
+    private BigQueryTableSource(
+        String jobIdToken,
+        String jsonTable,
+        String extractDestinationDir,
+        BigQueryServices bqServices) {
+      super(jobIdToken, extractDestinationDir, bqServices);
+      this.jsonTable = checkNotNull(jsonTable, "jsonTable");
+      this.tableSizeBytes = new AtomicReference<>();
+    }
+
+    @Override
+    protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
+      return JSON_FACTORY.fromString(jsonTable, TableReference.class);
+    }
+
+    @Override
+    public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      TableReference tableRef = JSON_FACTORY.fromString(jsonTable, TableReference.class);
+      return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
+    }
+
+    @Override
+    public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      if (tableSizeBytes.get() == null) {
+        TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class);
+
+        Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+            .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId())
+            .getNumBytes();
+        tableSizeBytes.compareAndSet(null, numBytes);
+      }
+      return tableSizeBytes.get();
+    }
+
+    @Override
+    protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+      // Do nothing.
+    }
+  }
+
+  /**
+   * A {@link BigQuerySourceBase} for querying BigQuery tables.
+   */
+  @VisibleForTesting
+  static class BigQueryQuerySource extends BigQuerySourceBase {
+
+    static BigQueryQuerySource create(
+        String jobIdToken,
+        String query,
+        String jsonQueryTempTable,
+        Boolean flattenResults,
+        String extractDestinationDir,
+        BigQueryServices bqServices) {
+      return new BigQueryQuerySource(
+          jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices);
+    }
+
+    private final String query;
+    private final String jsonQueryTempTable;
+    private final Boolean flattenResults;
+    private transient AtomicReference<JobStatistics> dryRunJobStats;
+
+    private BigQueryQuerySource(
+        String jobIdToken,
+        String query,
+        String jsonQueryTempTable,
+        Boolean flattenResults,
+        String extractDestinationDir,
+        BigQueryServices bqServices) {
+      super(jobIdToken, extractDestinationDir, bqServices);
+      this.query = checkNotNull(query, "query");
+      this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable");
+      this.flattenResults = checkNotNull(flattenResults, "flattenResults");
+      this.dryRunJobStats = new AtomicReference<>();
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
+    }
+
+    @Override
+    public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      return new BigQueryReader(this, bqServices.getReaderFromQuery(
+          bqOptions, query, bqOptions.getProject(), flattenResults));
+    }
+
+    @Override
+    protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+        throws IOException, InterruptedException {
+      // 1. Find the location of the query.
+      TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
+          .getQuery()
+          .getReferencedTables()
+          .get(0);
+      DatasetService tableService = bqServices.getDatasetService(bqOptions);
+      String location = tableService.getTable(
+          dryRunTempTable.getProjectId(),
+          dryRunTempTable.getDatasetId(),
+          dryRunTempTable.getTableId()).getLocation();
+
+      // 2. Create the temporary dataset in the query location.
+      TableReference tableToExtract =
+          JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
+      tableService.createDataset(
+          tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
+
+      // 3. Execute the query.
+      String queryJobId = jobIdToken + "-query";
+      executeQuery(
+          queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions));
+      return tableToExtract;
+    }
+
+    @Override
+    protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+      TableReference tableToRemove =
+          JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
+
+      DatasetService tableService = bqServices.getDatasetService(bqOptions);
+      tableService.deleteTable(
+          tableToRemove.getProjectId(),
+          tableToRemove.getDatasetId(),
+          tableToRemove.getTableId());
+      tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+    }
+
+    private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
+        throws InterruptedException, IOException {
+      if (dryRunJobStats.get() == null) {
+        String projectId = bqOptions.getProject();
+        JobStatistics jobStats =
+            bqServices.getJobService(bqOptions).dryRunQuery(projectId, query);
+        dryRunJobStats.compareAndSet(null, jobStats);
+      }
+      return dryRunJobStats.get();
+    }
+
+    private static void executeQuery(
+        String jobId,
+        String query,
+        TableReference destinationTable,
+        boolean flattenResults,
+        JobService jobService) throws IOException, InterruptedException {
+      JobReference jobRef = new JobReference()
+          .setProjectId(destinationTable.getProjectId())
+          .setJobId(jobId);
+      JobConfigurationQuery queryConfig = new JobConfigurationQuery();
+      queryConfig
+          .setQuery(query)
+          .setAllowLargeResults(true)
+          .setCreateDisposition("CREATE_IF_NEEDED")
+          .setDestinationTable(destinationTable)
+          .setFlattenResults(flattenResults)
+          .setPriority("BATCH")
+          .setWriteDisposition("WRITE_EMPTY");
+      jobService.startQueryJob(jobRef, queryConfig);
+      Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+      if (parseStatus(job) != Status.SUCCEEDED) {
+        throw new IOException("Query job failed: " + jobId);
+      }
+      return;
+    }
+
+    private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+      in.defaultReadObject();
+      dryRunJobStats = new AtomicReference<>();
+    }
+  }
+
+  /**
+   * An abstract {@link BoundedSource} to read a table from BigQuery.
+   *
+   * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
+   * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
+   * and {@link BigQueryQuerySource}, depending on the configuration of the read.
+   * Specifically,
+   * <ul>
+   * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
+   * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
+   * </ul>
+   * ...
+   */
+  private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
+    // The maximum number of attempts to verify temp files.
+    private static final int MAX_FILES_VERIFY_ATTEMPTS = 10;
+
+    // The maximum number of retries to poll a BigQuery job.
+    protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+    // The initial backoff for verifying temp files.
+    private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+    protected final String jobIdToken;
+    protected final String extractDestinationDir;
+    protected final BigQueryServices bqServices;
+
+    private BigQuerySourceBase(
+        String jobIdToken,
+        String extractDestinationDir,
+        BigQueryServices bqServices) {
+      this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+      this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
+      this.bqServices = checkNotNull(bqServices, "bqServices");
+    }
+
+    @Override
+    public List<BoundedSource<TableRow>> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+      TableReference tableToExtract = getTableToExtract(bqOptions);
+      JobService jobService = bqServices.getJobService(bqOptions);
+      String extractJobId = getExtractJobId(jobIdToken);
+      List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+      TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
+          tableToExtract.getProjectId(),
+          tableToExtract.getDatasetId(),
+          tableToExtract.getTableId()).getSchema();
+
+      cleanupTempResource(bqOptions);
+      return createSources(tempFiles, tableSchema);
+    }
+
+    protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
+
+    protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return false;
+    }
+
+    @Override
+    public void validate() {
+      // Do nothing, validation is done in BigQuery.Read.
+    }
+
+    @Override
+    public Coder<TableRow> getDefaultOutputCoder() {
+      return TableRowJsonCoder.of();
+    }
+
+    private List<String> executeExtract(
+        String jobId, TableReference table, JobService jobService)
+            throws InterruptedException, IOException {
+      JobReference jobRef = new JobReference()
+          .setProjectId(table.getProjectId())
+          .setJobId(jobId);
+
+      String destinationUri = getExtractDestinationUri(extractDestinationDir);
+      JobConfigurationExtract extract = new JobConfigurationExtract()
+          .setSourceTable(table)
+          .setDestinationFormat("AVRO")
+          .setDestinationUris(ImmutableList.of(destinationUri));
+
+      LOG.info("Starting BigQuery extract job: {}", jobId);
+      jobService.startExtractJob(jobRef, extract);
+      Job extractJob =
+          jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+      if (parseStatus(extractJob) != Status.SUCCEEDED) {
+        throw new IOException(String.format(
+            "Extract job %s failed, status: %s",
+            extractJob.getJobReference().getJobId(), extractJob.getStatus()));
+      }
+
+      List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+      return ImmutableList.copyOf(tempFiles);
+    }
+
+    private List<BoundedSource<TableRow>> createSources(
+        List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+      final String jsonSchema = JSON_FACTORY.toString(tableSchema);
+
+      SerializableFunction<GenericRecord, TableRow> function =
+          new SerializableFunction<GenericRecord, TableRow>() {
+            @Override
+            public TableRow apply(GenericRecord input) {
+              try {
+                return AvroUtils.convertGenericRecordToTableRow(
+                    input, JSON_FACTORY.fromString(jsonSchema, TableSchema.class));
+              } catch (IOException e) {
+                throw new RuntimeException("Failed to convert GenericRecord to TableRow", e);
+              }
+            }};
+
+      List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
+      BackOff backoff = new AttemptBoundedExponentialBackOff(
+          MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS);
+      for (String fileName : files) {
+        while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+          if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) {
+            break;
+          }
+        }
+        avroSources.add(new TransformingSource<>(
+            AvroSource.from(fileName), function, getDefaultOutputCoder()));
+      }
+      return ImmutableList.copyOf(avroSources);
+    }
+
+    protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> {
+      private final BigQuerySourceBase source;
+      private final BigQueryServices.BigQueryJsonReader reader;
+
+      private BigQueryReader(
+          BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
+        this.source = source;
+        this.reader = reader;
+      }
+
+      @Override
+      public BoundedSource<TableRow> getCurrentSource() {
+        return source;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return reader.start();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return reader.advance();
+      }
+
+      @Override
+      public TableRow getCurrent() throws NoSuchElementException {
+        return reader.getCurrent();
+      }
+
+      @Override
+      public void close() throws IOException {
+        reader.close();
+      }
+    }
+  }
+
+  /**
+   * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
+   * and transforms elements to type {@code V}.
+  */
+  @VisibleForTesting
+  static class TransformingSource<T, V> extends BoundedSource<V> {
+    private final BoundedSource<T> boundedSource;
+    private final SerializableFunction<T, V> function;
+    private final Coder<V> outputCoder;
+
+    TransformingSource(
+        BoundedSource<T> boundedSource,
+        SerializableFunction<T, V> function,
+        Coder<V> outputCoder) {
+      this.boundedSource = boundedSource;
+      this.function = function;
+      this.outputCoder = outputCoder;
+    }
+
+    @Override
+    public List<? extends BoundedSource<V>> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      return Lists.transform(
+          boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+          new Function<BoundedSource<T>, BoundedSource<V>>() {
+            @Override
+            public BoundedSource<V> apply(BoundedSource<T> input) {
+              return new TransformingSource<>(input, function, outputCoder);
+            }
+          });
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return boundedSource.getEstimatedSizeBytes(options);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return boundedSource.producesSortedKeys(options);
+    }
+
+    @Override
+    public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
+      return new TransformingReader(boundedSource.createReader(options));
+    }
+
+    @Override
+    public void validate() {
+      boundedSource.validate();
+    }
+
+    @Override
+    public Coder<V> getDefaultOutputCoder() {
+      return outputCoder;
+    }
+
+    private class TransformingReader extends BoundedReader<V> {
+      private final BoundedReader<T> boundedReader;
+
+      private TransformingReader(BoundedReader<T> boundedReader) {
+        this.boundedReader = boundedReader;
+      }
+
+      @Override
+      public synchronized BoundedSource<V> getCurrentSource() {
+        return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return boundedReader.start();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return boundedReader.advance();
+      }
+
+      @Override
+      public V getCurrent() throws NoSuchElementException {
+        T current = boundedReader.getCurrent();
+        return function.apply(current);
+      }
+
+      @Override
+      public void close() throws IOException {
+        boundedReader.close();
+      }
+
+      @Override
+      public synchronized BoundedSource<V> splitAtFraction(double fraction) {
+        return new TransformingSource<>(
+            boundedReader.splitAtFraction(fraction), function, outputCoder);
+      }
+
+      @Override
+      public Double getFractionConsumed() {
+        return boundedReader.getFractionConsumed();
+      }
+
+      @Override
+      public Instant getCurrentTimestamp() throws NoSuchElementException {
+        return boundedReader.getCurrentTimestamp();
+      }
+    }
+  }
+
+  private static String getExtractJobId(String jobIdToken) {
+    return jobIdToken + "-extract";
+  }
+
+  private static String getExtractDestinationUri(String extractDestinationDir) {
+    return String.format("%s/%s", extractDestinationDir, "*.avro");
+  }
+
+  private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+      throws IOException {
+    JobStatistics jobStats = extractJob.getStatistics();
+    List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
+    if (counts.size() != 1) {
+      String errorMessage = (counts.size() == 0 ?
+          "No destination uri file count received." :
+          String.format("More than one destination uri file count received. First two are %s, %s",
+              counts.get(0), counts.get(1)));
+      throw new RuntimeException(errorMessage);
+    }
+    long filesCount = counts.get(0);
+
+    ImmutableList.Builder<String> paths = ImmutableList.builder();
+    IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+    for (long i = 0; i < filesCount; ++i) {
+      String filePath =
+          factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
+      paths.add(filePath);
+    }
+    return paths.build();
+  }
+
   /////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -799,8 +1465,15 @@ public class BigQueryIO {
        */
       @Deprecated
       public Bound() {
-        this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED,
-            WriteDisposition.WRITE_EMPTY, true, null);
+        this(
+            null /* name */,
+            null /* jsonTableRef */,
+            null /* tableRefFunction */,
+            null /* jsonSchema */,
+            CreateDisposition.CREATE_IF_NEEDED,
+            WriteDisposition.WRITE_EMPTY,
+            true /* validate */,
+            null /* testBigQueryServices */);
       }
 
       private Bound(String name, @Nullable String jsonTableRef,
@@ -1032,7 +1705,7 @@ public class BigQueryIO {
         if (Strings.isNullOrEmpty(table.getProjectId())) {
           table.setProjectId(options.getProject());
         }
-        String jobIdToken = UUID.randomUUID().toString();
+        String jobIdToken = randomUUIDString();
         String tempLocation = options.getTempLocation();
         String tempFilePrefix;
         try {
@@ -1181,7 +1854,7 @@ public class BigQueryIO {
 
       // The maximum number of retries to poll the status of a load job.
       // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
-      private static final int MAX_JOB_STATUS_POLL_RETRIES = Integer.MAX_VALUE;
+      private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
 
       private final BigQuerySink bigQuerySink;
 
@@ -1237,13 +1910,13 @@ public class BigQueryIO {
           @Nullable TableSchema schema,
           WriteDisposition writeDisposition,
           CreateDisposition createDisposition) throws InterruptedException, IOException {
-        JobConfigurationLoad loadConfig = new JobConfigurationLoad();
-        loadConfig.setSourceUris(gcsUris);
-        loadConfig.setDestinationTable(ref);
-        loadConfig.setSchema(schema);
-        loadConfig.setWriteDisposition(writeDisposition.name());
-        loadConfig.setCreateDisposition(createDisposition.name());
-        loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON");
+        JobConfigurationLoad loadConfig = new JobConfigurationLoad()
+            .setSourceUris(gcsUris)
+            .setDestinationTable(ref)
+            .setSchema(schema)
+            .setWriteDisposition(writeDisposition.name())
+            .setCreateDisposition(createDisposition.name())
+            .setSourceFormat("NEWLINE_DELIMITED_JSON");
 
         boolean retrying = false;
         String projectId = ref.getProjectId();
@@ -1253,9 +1926,12 @@ public class BigQueryIO {
             LOG.info("Previous load jobs failed, retrying.");
           }
           LOG.info("Starting BigQuery load job: {}", jobId);
-          jobService.startLoadJob(jobId, loadConfig);
-          Status jobStatus = parseStatus(
-              jobService.pollJob(projectId, jobId, MAX_JOB_STATUS_POLL_RETRIES));
+          JobReference jobRef = new JobReference()
+              .setProjectId(projectId)
+              .setJobId(jobId);
+          jobService.startLoadJob(jobRef, loadConfig);
+          Status jobStatus =
+              parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES));
           switch (jobStatus) {
             case SUCCEEDED:
               return;
@@ -1711,7 +2387,7 @@ public class BigQueryIO {
     UNKNOWN,
   }
 
-  private static Status parseStatus(@Nullable Job job) {
+  private static Status parseStatus(Job job) {
     if (job == null) {
       return Status.UNKNOWN;
     }
@@ -1725,7 +2401,8 @@ public class BigQueryIO {
     }
   }
 
-  private static String toJsonString(Object item) {
+  @VisibleForTesting
+  static String toJsonString(Object item) {
     if (item == null) {
       return null;
     }
@@ -1738,7 +2415,8 @@ public class BigQueryIO {
     }
   }
 
-  private static <T> T fromJsonString(String json, Class<T> clazz) {
+  @VisibleForTesting
+  static <T> T fromJsonString(String json, Class<T> clazz) {
     if (json == null) {
       return null;
     }
@@ -1751,51 +2429,20 @@ public class BigQueryIO {
     }
   }
 
-  /////////////////////////////////////////////////////////////////////////////
-
-  /** Disallow construction of utility class. */
-  private BigQueryIO() {}
-
   /**
-   * Direct mode read evaluator.
+   * Returns a randomUUID string.
    *
-   * <p>This loads the entire table into an in-memory PCollection.
+   * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
    */
-  private static void evaluateReadHelper(
-      Read.Bound transform, DirectPipelineRunner.EvaluationContext context) {
-    BigQueryOptions options = context.getPipelineOptions();
-    Bigquery client = Transport.newBigQueryClient(options).build();
-    if (transform.table != null && transform.table.getProjectId() == null) {
-      transform.table.setProjectId(options.getProject());
-    }
-
-    BigQueryTableRowIterator iterator;
-    if (transform.query != null) {
-      LOG.info("Reading from BigQuery query {}", transform.query);
-      iterator =
-          BigQueryTableRowIterator.fromQuery(
-              transform.query, options.getProject(), client, transform.getFlattenResults());
-    } else {
-      LOG.info("Reading from BigQuery table {}", toTableSpec(transform.table));
-      iterator = BigQueryTableRowIterator.fromTable(transform.table, client);
-    }
-
-    try (BigQueryTableRowIterator ignored = iterator) {
-      List<TableRow> elems = new ArrayList<>();
-      iterator.open();
-      while (iterator.advance()) {
-        elems.add(iterator.getCurrent());
-      }
-      LOG.info("Number of records read from BigQuery: {}", elems.size());
-      context.setPCollection(context.getOutput(transform), elems);
-    } catch (IOException | InterruptedException e) {
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
-      }
-      throw new RuntimeException(e);
-    }
+  private static String randomUUIDString() {
+    return UUID.randomUUID().toString().replaceAll("-", "");
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+
+  /** Disallow construction of utility class. */
+  private BigQueryIO() {}
+
   private static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
     List<V> value = map.get(key);
     if (value == null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
index b12e049..f82edf4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
@@ -19,13 +19,22 @@ package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.options.BigQueryOptions;
 
+import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
 
 /**
  * An interface for real, mock, or fake implementations of Cloud BigQuery services.
@@ -38,25 +47,40 @@ public interface BigQueryServices extends Serializable {
   public JobService getJobService(BigQueryOptions bqOptions);
 
   /**
+   * Returns a real, mock, or fake {@link DatasetService}.
+   */
+  public DatasetService getDatasetService(BigQueryOptions bqOptions);
+
+  /**
+   * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables.
+   */
+  public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef);
+
+  /**
+   * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
+   */
+  public BigQueryJsonReader getReaderFromQuery(
+      BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten);
+
+  /**
    * An interface for the Cloud BigQuery load service.
    */
   public interface JobService {
     /**
-     * Starts a BigQuery load job.
+     * Start a BigQuery load job.
      */
-    void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
+    void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
         throws InterruptedException, IOException;
-
     /**
      * Start a BigQuery extract job.
      */
-    void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+    void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
         throws InterruptedException, IOException;
 
     /**
-     * Start a BigQuery extract job.
+     * Start a BigQuery query job.
      */
-    void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun)
+    void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
         throws IOException, InterruptedException;
 
     /**
@@ -64,7 +88,78 @@ public interface BigQueryServices extends Serializable {
      *
      * <p>Returns null if the {@code maxAttempts} retries reached.
      */
-    Job pollJob(String projectId, String jobId, int maxAttempts)
+    Job pollJob(JobReference jobRef, int maxAttempts)
         throws InterruptedException, IOException;
+
+    /**
+     * Dry runs the query in the given project.
+     */
+    JobStatistics dryRunQuery(String projectId, String query)
+        throws InterruptedException, IOException;
+  }
+
+  /**
+   * An interface to get, create and delete Cloud BigQuery datasets and tables.
+   */
+  public interface DatasetService {
+    /**
+     * Gets the specified {@link Table} resource by table ID.
+     */
+    Table getTable(String projectId, String datasetId, String tableId)
+        throws InterruptedException, IOException;
+
+    /**
+     * Deletes the table specified by tableId from the dataset.
+     * If the table contains data, all the data will be deleted.
+     */
+    void deleteTable(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException;
+
+    /**
+     * Create a {@link Dataset} with the given {@code location} and {@code description}.
+     */
+    void createDataset(String projectId, String datasetId, String location, String description)
+        throws IOException, InterruptedException;
+
+    /**
+     * Deletes the dataset specified by the datasetId value.
+     *
+     * <p>Before you can delete a dataset, you must delete all its tables.
+     */
+    void deleteDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException;
+  }
+
+  /**
+   * An interface to read the Cloud BigQuery directly.
+   */
+  public interface BigQueryJsonReader {
+    /**
+     * Initializes the reader and advances the reader to the first record.
+     */
+    boolean start() throws IOException;
+
+    /**
+     * Advances the reader to the next valid record.
+     */
+    boolean advance() throws IOException;
+
+    /**
+     * Returns the value of the data item that was read by the last {@link #start} or
+     * {@link #advance} call. The returned value must be effectively immutable and remain valid
+     * indefinitely.
+     *
+     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
+     * return the same result.
+     *
+     * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
+     *         the last {@link #start} or {@link #advance} returned {@code false}.
+     */
+    TableRow getCurrent() throws NoSuchElementException;
+
+    /**
+     * Closes the reader. The reader cannot be used after this method is called.
+     */
+    void close() throws IOException;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
index 2bfe84f..01ea45f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
@@ -20,17 +20,24 @@ package org.apache.beam.sdk.util;
 import org.apache.beam.sdk.options.BigQueryOptions;
 
 import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
 import com.google.api.client.util.BackOff;
 import com.google.api.client.util.BackOffUtils;
 import com.google.api.client.util.Sleeper;
 import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobConfiguration;
 import com.google.api.services.bigquery.model.JobConfigurationExtract;
 import com.google.api.services.bigquery.model.JobConfigurationLoad;
 import com.google.api.services.bigquery.model.JobConfigurationQuery;
 import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
 import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 
@@ -38,14 +45,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.NoSuchElementException;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nullable;
+
 /**
  * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery
  * service.
  */
 public class BigQueryServicesImpl implements BigQueryServices {
 
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
+
   // The maximum number of attempts to execute a BigQuery RPC.
   private static final int MAX_RPC_ATTEMPTS = 10;
 
@@ -53,17 +65,31 @@ public class BigQueryServicesImpl implements BigQueryServices {
   private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
 
   // The initial backoff for polling the status of a BigQuery job.
-  private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);
+  private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
 
   @Override
   public JobService getJobService(BigQueryOptions options) {
     return new JobServiceImpl(options);
   }
 
+  @Override
+  public DatasetService getDatasetService(BigQueryOptions options) {
+    return new DatasetServiceImpl(options);
+  }
+
+  @Override
+  public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
+    return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef);
+  }
+
+  @Override
+  public BigQueryJsonReader getReaderFromQuery(
+      BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
+    return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten);
+  }
+
   @VisibleForTesting
   static class JobServiceImpl implements BigQueryServices.JobService {
-    private static final Logger LOG = LoggerFactory.getLogger(JobServiceImpl.class);
-
     private final ApiErrorExtractor errorExtractor;
     private final Bigquery client;
 
@@ -81,22 +107,17 @@ public class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC retries.
+     * @throws IOException if it exceeds max RPC .
      */
     @Override
     public void startLoadJob(
-        String jobId,
+        JobReference jobRef,
         JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
-      Job job = new Job();
-      JobReference jobRef = new JobReference();
-      jobRef.setProjectId(loadConfig.getDestinationTable().getProjectId());
-      jobRef.setJobId(jobId);
-      job.setJobReference(jobRef);
-      JobConfiguration jobConfig = new JobConfiguration();
-      jobConfig.setLoad(loadConfig);
-      job.setConfiguration(jobConfig);
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(new JobConfiguration().setLoad(loadConfig));
 
       startJob(job, errorExtractor, client);
     }
@@ -104,21 +125,17 @@ public class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC retries.
+     * @throws IOException if it exceeds max RPC .
      */
     @Override
-    public void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
         throws InterruptedException, IOException {
-      Job job = new Job();
-      JobReference jobRef = new JobReference();
-      jobRef.setProjectId(extractConfig.getSourceTable().getProjectId());
-      jobRef.setJobId(jobId);
-      job.setJobReference(jobRef);
-      JobConfiguration jobConfig = new JobConfiguration();
-      jobConfig.setExtract(extractConfig);
-      job.setConfiguration(jobConfig);
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(
+              new JobConfiguration().setExtract(extractConfig));
 
       startJob(job, errorExtractor, client);
     }
@@ -126,22 +143,17 @@ public class BigQueryServicesImpl implements BigQueryServices {
     /**
      * {@inheritDoc}
      *
-     * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
      *
-     * @throws IOException if it exceeds max RPC retries.
+     * @throws IOException if it exceeds max RPC .
      */
     @Override
-    public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun)
+    public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
         throws IOException, InterruptedException {
-      Job job = new Job();
-      JobReference jobRef = new JobReference();
-      jobRef.setProjectId(queryConfig.getDestinationTable().getProjectId());
-      jobRef.setJobId(jobId);
-      job.setJobReference(jobRef);
-      JobConfiguration jobConfig = new JobConfiguration();
-      jobConfig.setQuery(queryConfig);
-      jobConfig.setDryRun(dryRun);
-      job.setConfiguration(jobConfig);
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(
+              new JobConfiguration().setQuery(queryConfig));
 
       startJob(job, errorExtractor, client);
     }
@@ -160,8 +172,7 @@ public class BigQueryServicesImpl implements BigQueryServices {
         ApiErrorExtractor errorExtractor,
         Bigquery client,
         Sleeper sleeper,
-        BackOff backoff)
-        throws InterruptedException, IOException {
+        BackOff backoff) throws IOException, InterruptedException {
       JobReference jobRef = job.getJobReference();
       Exception lastException = null;
       do {
@@ -183,28 +194,27 @@ public class BigQueryServicesImpl implements BigQueryServices {
       } while (nextBackOff(sleeper, backoff));
       throw new IOException(
           String.format(
-              "Unable to insert job: %s, aborting after %d retries.",
+              "Unable to insert job: %s, aborting after %d .",
               jobRef.getJobId(), MAX_RPC_ATTEMPTS),
           lastException);
     }
 
     @Override
-    public Job pollJob(String projectId, String jobId, int maxAttempts)
+    public Job pollJob(JobReference jobRef, int maxAttempts)
         throws InterruptedException {
       BackOff backoff = new AttemptBoundedExponentialBackOff(
           maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
-      return pollJob(projectId, jobId, Sleeper.DEFAULT, backoff);
+      return pollJob(jobRef, Sleeper.DEFAULT, backoff);
     }
 
     @VisibleForTesting
     Job pollJob(
-        String projectId,
-        String jobId,
+        JobReference jobRef,
         Sleeper sleeper,
         BackOff backoff) throws InterruptedException {
       do {
         try {
-          Job job = client.jobs().get(projectId, jobId).execute();
+          Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
           JobStatus status = job.getStatus();
           if (status != null && status.getState() != null && status.getState().equals("DONE")) {
             return job;
@@ -215,21 +225,254 @@ public class BigQueryServicesImpl implements BigQueryServices {
           LOG.warn("Ignore the error and retry polling job status.", e);
         }
       } while (nextBackOff(sleeper, backoff));
-      LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId);
+      LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());
       return null;
     }
 
+    @Override
+    public JobStatistics dryRunQuery(String projectId, String query)
+        throws InterruptedException, IOException {
+      Job job = new Job()
+          .setConfiguration(new JobConfiguration()
+              .setQuery(new JobConfigurationQuery()
+                  .setQuery(query))
+              .setDryRun(true));
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      return executeWithRetries(
+          client.jobs().insert(projectId, job),
+          String.format(
+              "Unable to dry run query: %s, aborting after %d retries.",
+              query, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff).getStatistics();
+    }
+  }
+
+  @VisibleForTesting
+  static class DatasetServiceImpl implements DatasetService {
+    private final ApiErrorExtractor errorExtractor;
+    private final Bigquery client;
+
+    @VisibleForTesting
+    DatasetServiceImpl(Bigquery client) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = client;
+    }
+
+    private DatasetServiceImpl(BigQueryOptions bqOptions) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = Transport.newBigQueryClient(bqOptions).build();
+    }
+
     /**
-     * Identical to {@link BackOffUtils#next} but without checked IOException.
-     * @throws InterruptedException
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
      */
-    private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
-        throws InterruptedException {
+    @Override
+    public Table getTable(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      return executeWithRetries(
+          client.tables().get(projectId, datasetId, tableId),
+          String.format(
+              "Unable to get table: %s, aborting after %d retries.",
+              tableId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void deleteTable(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      executeWithRetries(
+          client.tables().delete(projectId, datasetId, tableId),
+          String.format(
+              "Unable to delete table: %s, aborting after %d retries.",
+              tableId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void createDataset(
+        String projectId, String datasetId, String location, String description)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
+    }
+
+    @VisibleForTesting
+    void createDataset(
+        String projectId,
+        String datasetId,
+        String location,
+        String description,
+        Sleeper sleeper,
+        BackOff backoff) throws IOException, InterruptedException {
+      DatasetReference datasetRef = new DatasetReference()
+          .setProjectId(projectId)
+          .setDatasetId(datasetId);
+
+      Dataset dataset = new Dataset()
+          .setDatasetReference(datasetRef)
+          .setLocation(location)
+          .setFriendlyName(location)
+          .setDescription(description);
+
+      Exception lastException;
+      do {
+        try {
+          client.datasets().insert(projectId, dataset).execute();
+          return; // SUCCEEDED
+        } catch (GoogleJsonResponseException e) {
+          if (errorExtractor.itemAlreadyExists(e)) {
+            return; // SUCCEEDED
+          }
+          // ignore and retry
+          LOG.warn("Ignore the error and retry creating the dataset.", e);
+          lastException = e;
+        } catch (IOException e) {
+          LOG.warn("Ignore the error and retry creating the dataset.", e);
+          lastException = e;
+        }
+      } while (nextBackOff(sleeper, backoff));
+      throw new IOException(
+          String.format(
+              "Unable to create dataset: %s, aborting after %d .",
+              datasetId, MAX_RPC_ATTEMPTS),
+          lastException);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void deleteDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      executeWithRetries(
+          client.datasets().delete(projectId, datasetId),
+          String.format(
+              "Unable to delete table: %s, aborting after %d retries.",
+              datasetId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+  }
+
+  private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
+    BigQueryTableRowIterator iterator;
+
+    private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
+      this.iterator = iterator;
+    }
+
+    private static BigQueryJsonReader fromQuery(
+        BigQueryOptions bqOptions,
+        String query,
+        String projectId,
+        @Nullable Boolean flattenResults) {
+      return new BigQueryJsonReaderImpl(
+          BigQueryTableRowIterator.fromQuery(
+              query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults));
+    }
+
+    private static BigQueryJsonReader fromTable(
+        BigQueryOptions bqOptions,
+        TableReference tableRef) {
+      return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
+          tableRef, Transport.newBigQueryClient(bqOptions).build()));
+    }
+
+    @Override
+    public boolean start() throws IOException {
       try {
-        return BackOffUtils.next(sleeper, backoff);
+        iterator.open();
+        return iterator.advance();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted during start() operation", e);
+      }
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      try {
+        return iterator.advance();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted during advance() operation", e);
+      }
+    }
+
+    @Override
+    public TableRow getCurrent() throws NoSuchElementException {
+      return iterator.getCurrent();
+    }
+
+    @Override
+    public void close() throws IOException {
+      iterator.close();
+    }
+  }
+
+  @VisibleForTesting
+  static <T> T executeWithRetries(
+      AbstractGoogleClientRequest<T> request,
+      String errorMessage,
+      Sleeper sleeper,
+      BackOff backoff)
+      throws IOException, InterruptedException {
+    Exception lastException = null;
+    do {
+      try {
+        return request.execute();
       } catch (IOException e) {
-        throw new RuntimeException(e);
+        LOG.warn("Ignore the error and retry the request.", e);
+        lastException = e;
       }
+    } while (nextBackOff(sleeper, backoff));
+    throw new IOException(
+        errorMessage,
+        lastException);
+  }
+
+  /**
+   * Identical to {@link BackOffUtils#next} but without checked IOException.
+   * @throws InterruptedException
+   */
+  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
+    try {
+      return BackOffUtils.next(sleeper, backoff);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
   }
 }