You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:02:58 UTC

[03/10] incubator-beam git commit: BigQueryIO: move to google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
new file mode 100644
index 0000000..3e77362
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.options.BigQueryOptions;
+
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * An interface for real, mock, or fake implementations of Cloud BigQuery services.
+ */
+interface BigQueryServices extends Serializable {
+
+  /**
+   * Returns a real, mock, or fake {@link JobService}.
+   */
+  JobService getJobService(BigQueryOptions bqOptions);
+
+  /**
+   * Returns a real, mock, or fake {@link DatasetService}.
+   */
+  DatasetService getDatasetService(BigQueryOptions bqOptions);
+
+  /**
+   * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables.
+   */
+  BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef);
+
+  /**
+   * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
+   */
+  BigQueryJsonReader getReaderFromQuery(
+      BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten);
+
+  /**
+   * An interface for the Cloud BigQuery load service.
+   */
+  interface JobService {
+    /**
+     * Start a BigQuery load job.
+     */
+    void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
+        throws InterruptedException, IOException;
+    /**
+     * Start a BigQuery extract job.
+     */
+    void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
+        throws InterruptedException, IOException;
+
+    /**
+     * Start a BigQuery query job.
+     */
+    void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
+        throws IOException, InterruptedException;
+
+    /**
+     * Waits for the job is Done, and returns the job.
+     *
+     * <p>Returns null if the {@code maxAttempts} retries reached.
+     */
+    Job pollJob(JobReference jobRef, int maxAttempts)
+        throws InterruptedException, IOException;
+
+    /**
+     * Dry runs the query in the given project.
+     */
+    JobStatistics dryRunQuery(String projectId, String query)
+        throws InterruptedException, IOException;
+  }
+
+  /**
+   * An interface to get, create and delete Cloud BigQuery datasets and tables.
+   */
+  interface DatasetService {
+    /**
+     * Gets the specified {@link Table} resource by table ID.
+     */
+    Table getTable(String projectId, String datasetId, String tableId)
+        throws InterruptedException, IOException;
+
+    /**
+     * Deletes the table specified by tableId from the dataset.
+     * If the table contains data, all the data will be deleted.
+     */
+    void deleteTable(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException;
+
+    /**
+     * Returns true if the table is empty.
+     */
+    boolean isTableEmpty(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException;
+
+    /**
+     * Gets the specified {@link Dataset} resource by dataset ID.
+     */
+    Dataset getDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException;
+
+    /**
+     * Create a {@link Dataset} with the given {@code location} and {@code description}.
+     */
+    void createDataset(String projectId, String datasetId, String location, String description)
+        throws IOException, InterruptedException;
+
+    /**
+     * Deletes the dataset specified by the datasetId value.
+     *
+     * <p>Before you can delete a dataset, you must delete all its tables.
+     */
+    void deleteDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException;
+  }
+
+  /**
+   * An interface to read the Cloud BigQuery directly.
+   */
+  interface BigQueryJsonReader {
+    /**
+     * Initializes the reader and advances the reader to the first record.
+     */
+    boolean start() throws IOException;
+
+    /**
+     * Advances the reader to the next valid record.
+     */
+    boolean advance() throws IOException;
+
+    /**
+     * Returns the value of the data item that was read by the last {@link #start} or
+     * {@link #advance} call. The returned value must be effectively immutable and remain valid
+     * indefinitely.
+     *
+     * <p>Multiple calls to this method without an intervening call to {@link #advance} should
+     * return the same result.
+     *
+     * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
+     *         the last {@link #start} or {@link #advance} returned {@code false}.
+     */
+    TableRow getCurrent() throws NoSuchElementException;
+
+    /**
+     * Closes the reader. The reader cannot be used after this method is called.
+     */
+    void close() throws IOException;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
new file mode 100644
index 0000000..414baae
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.Transport;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery
+ * service.
+ */
+class BigQueryServicesImpl implements BigQueryServices {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
+
+  // The maximum number of attempts to execute a BigQuery RPC.
+  private static final int MAX_RPC_ATTEMPTS = 10;
+
+  // The initial backoff for executing a BigQuery RPC.
+  private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+  // The initial backoff for polling the status of a BigQuery job.
+  private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+  @Override
+  public JobService getJobService(BigQueryOptions options) {
+    return new JobServiceImpl(options);
+  }
+
+  @Override
+  public DatasetService getDatasetService(BigQueryOptions options) {
+    return new DatasetServiceImpl(options);
+  }
+
+  @Override
+  public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
+    return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef);
+  }
+
+  @Override
+  public BigQueryJsonReader getReaderFromQuery(
+      BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
+    return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten);
+  }
+
+  @VisibleForTesting
+  static class JobServiceImpl implements BigQueryServices.JobService {
+    private final ApiErrorExtractor errorExtractor;
+    private final Bigquery client;
+
+    @VisibleForTesting
+    JobServiceImpl(Bigquery client) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = client;
+    }
+
+    private JobServiceImpl(BigQueryOptions options) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = Transport.newBigQueryClient(options).build();
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void startLoadJob(
+        JobReference jobRef,
+        JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(new JobConfiguration().setLoad(loadConfig));
+
+      startJob(job, errorExtractor, client);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
+        throws InterruptedException, IOException {
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(
+              new JobConfiguration().setExtract(extractConfig));
+
+      startJob(job, errorExtractor, client);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
+        throws IOException, InterruptedException {
+      Job job = new Job()
+          .setJobReference(jobRef)
+          .setConfiguration(
+              new JobConfiguration().setQuery(queryConfig));
+
+      startJob(job, errorExtractor, client);
+    }
+
+    private static void startJob(Job job,
+      ApiErrorExtractor errorExtractor,
+      Bigquery client) throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
+    }
+
+    @VisibleForTesting
+    static void startJob(
+        Job job,
+        ApiErrorExtractor errorExtractor,
+        Bigquery client,
+        Sleeper sleeper,
+        BackOff backoff) throws IOException, InterruptedException {
+      JobReference jobRef = job.getJobReference();
+      Exception lastException = null;
+      do {
+        try {
+          client.jobs().insert(jobRef.getProjectId(), job).execute();
+          return; // SUCCEEDED
+        } catch (GoogleJsonResponseException e) {
+          if (errorExtractor.itemAlreadyExists(e)) {
+            return; // SUCCEEDED
+          }
+          // ignore and retry
+          LOG.warn("Ignore the error and retry inserting the job.", e);
+          lastException = e;
+        } catch (IOException e) {
+          // ignore and retry
+          LOG.warn("Ignore the error and retry inserting the job.", e);
+          lastException = e;
+        }
+      } while (nextBackOff(sleeper, backoff));
+      throw new IOException(
+          String.format(
+              "Unable to insert job: %s, aborting after %d .",
+              jobRef.getJobId(), MAX_RPC_ATTEMPTS),
+          lastException);
+    }
+
+    @Override
+    public Job pollJob(JobReference jobRef, int maxAttempts)
+        throws InterruptedException {
+      BackOff backoff = new AttemptBoundedExponentialBackOff(
+          maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
+      return pollJob(jobRef, Sleeper.DEFAULT, backoff);
+    }
+
+    @VisibleForTesting
+    Job pollJob(
+        JobReference jobRef,
+        Sleeper sleeper,
+        BackOff backoff) throws InterruptedException {
+      do {
+        try {
+          Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
+          JobStatus status = job.getStatus();
+          if (status != null && status.getState() != null && status.getState().equals("DONE")) {
+            return job;
+          }
+          // The job is not DONE, wait longer and retry.
+        } catch (IOException e) {
+          // ignore and retry
+          LOG.warn("Ignore the error and retry polling job status.", e);
+        }
+      } while (nextBackOff(sleeper, backoff));
+      LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());
+      return null;
+    }
+
+    @Override
+    public JobStatistics dryRunQuery(String projectId, String query)
+        throws InterruptedException, IOException {
+      Job job = new Job()
+          .setConfiguration(new JobConfiguration()
+              .setQuery(new JobConfigurationQuery()
+                  .setQuery(query))
+              .setDryRun(true));
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      return executeWithRetries(
+          client.jobs().insert(projectId, job),
+          String.format(
+              "Unable to dry run query: %s, aborting after %d retries.",
+              query, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff).getStatistics();
+    }
+  }
+
+  @VisibleForTesting
+  static class DatasetServiceImpl implements DatasetService {
+    private final ApiErrorExtractor errorExtractor;
+    private final Bigquery client;
+
+    private DatasetServiceImpl(BigQueryOptions bqOptions) {
+      this.errorExtractor = new ApiErrorExtractor();
+      this.client = Transport.newBigQueryClient(bqOptions).build();
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public Table getTable(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      return executeWithRetries(
+          client.tables().get(projectId, datasetId, tableId),
+          String.format(
+              "Unable to get table: %s, aborting after %d retries.",
+              tableId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void deleteTable(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      executeWithRetries(
+          client.tables().delete(projectId, datasetId, tableId),
+          String.format(
+              "Unable to delete table: %s, aborting after %d retries.",
+              tableId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+
+    @Override
+    public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      TableDataList dataList = executeWithRetries(
+          client.tabledata().list(projectId, datasetId, tableId),
+          String.format(
+              "Unable to list table data: %s, aborting after %d retries.",
+              tableId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+      return dataList.getRows() == null || dataList.getRows().isEmpty();
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public Dataset getDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      return executeWithRetries(
+          client.datasets().get(projectId, datasetId),
+          String.format(
+              "Unable to get dataset: %s, aborting after %d retries.",
+              datasetId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void createDataset(
+        String projectId, String datasetId, String location, String description)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
+    }
+
+    @VisibleForTesting
+    void createDataset(
+        String projectId,
+        String datasetId,
+        String location,
+        String description,
+        Sleeper sleeper,
+        BackOff backoff) throws IOException, InterruptedException {
+      DatasetReference datasetRef = new DatasetReference()
+          .setProjectId(projectId)
+          .setDatasetId(datasetId);
+
+      Dataset dataset = new Dataset()
+          .setDatasetReference(datasetRef)
+          .setLocation(location)
+          .setFriendlyName(location)
+          .setDescription(description);
+
+      Exception lastException;
+      do {
+        try {
+          client.datasets().insert(projectId, dataset).execute();
+          return; // SUCCEEDED
+        } catch (GoogleJsonResponseException e) {
+          if (errorExtractor.itemAlreadyExists(e)) {
+            return; // SUCCEEDED
+          }
+          // ignore and retry
+          LOG.warn("Ignore the error and retry creating the dataset.", e);
+          lastException = e;
+        } catch (IOException e) {
+          LOG.warn("Ignore the error and retry creating the dataset.", e);
+          lastException = e;
+        }
+      } while (nextBackOff(sleeper, backoff));
+      throw new IOException(
+          String.format(
+              "Unable to create dataset: %s, aborting after %d .",
+              datasetId, MAX_RPC_ATTEMPTS),
+          lastException);
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+     *
+     * @throws IOException if it exceeds max RPC .
+     */
+    @Override
+    public void deleteDataset(String projectId, String datasetId)
+        throws IOException, InterruptedException {
+      BackOff backoff =
+          new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+      executeWithRetries(
+          client.datasets().delete(projectId, datasetId),
+          String.format(
+              "Unable to delete table: %s, aborting after %d retries.",
+              datasetId, MAX_RPC_ATTEMPTS),
+          Sleeper.DEFAULT,
+          backoff);
+    }
+  }
+
+  private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
+    BigQueryTableRowIterator iterator;
+
+    private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
+      this.iterator = iterator;
+    }
+
+    private static BigQueryJsonReader fromQuery(
+        BigQueryOptions bqOptions,
+        String query,
+        String projectId,
+        @Nullable Boolean flattenResults) {
+      return new BigQueryJsonReaderImpl(
+          BigQueryTableRowIterator.fromQuery(
+              query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults));
+    }
+
+    private static BigQueryJsonReader fromTable(
+        BigQueryOptions bqOptions,
+        TableReference tableRef) {
+      return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
+          tableRef, Transport.newBigQueryClient(bqOptions).build()));
+    }
+
+    @Override
+    public boolean start() throws IOException {
+      try {
+        iterator.open();
+        return iterator.advance();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted during start() operation", e);
+      }
+    }
+
+    @Override
+    public boolean advance() throws IOException {
+      try {
+        return iterator.advance();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted during advance() operation", e);
+      }
+    }
+
+    @Override
+    public TableRow getCurrent() throws NoSuchElementException {
+      return iterator.getCurrent();
+    }
+
+    @Override
+    public void close() throws IOException {
+      iterator.close();
+    }
+  }
+
+  @VisibleForTesting
+  static <T> T executeWithRetries(
+      AbstractGoogleClientRequest<T> request,
+      String errorMessage,
+      Sleeper sleeper,
+      BackOff backoff)
+      throws IOException, InterruptedException {
+    Exception lastException = null;
+    do {
+      try {
+        return request.execute();
+      } catch (IOException e) {
+        LOG.warn("Ignore the error and retry the request.", e);
+        lastException = e;
+      }
+    } while (nextBackOff(sleeper, backoff));
+    throw new IOException(
+        errorMessage,
+        lastException);
+  }
+
+  /**
+   * Identical to {@link BackOffUtils#next} but without checked IOException.
+   * @throws InterruptedException
+   */
+  private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
+    try {
+      return BackOffUtils.next(sleeper, backoff);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
new file mode 100644
index 0000000..00a4fa3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * Inserts rows into BigQuery.
+ */
+class BigQueryTableInserter {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
+
+  // Approximate amount of table data to upload per InsertAll request.
+  private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
+
+  // The maximum number of rows to upload per InsertAll request.
+  private static final long MAX_ROWS_PER_BATCH = 500;
+
+  // The maximum number of times to retry inserting rows into BigQuery.
+  private static final int MAX_INSERT_ATTEMPTS = 5;
+
+  // The initial backoff after a failure inserting rows into BigQuery.
+  private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
+
+  // Backoff time bounds for rate limit exceeded errors.
+  private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
+  private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+
+  private final Bigquery client;
+  private final long maxRowsPerBatch;
+
+  private ExecutorService executor;
+
+  /**
+   * Constructs a new row inserter.
+   *
+   * @param client a BigQuery client
+   * @param options a PipelineOptions object
+   */
+  BigQueryTableInserter(Bigquery client, PipelineOptions options) {
+    this.client = client;
+    this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+    this.executor = options.as(GcsOptions.class).getExecutorService();
+  }
+
+  /**
+   * Constructs a new row inserter.
+   *
+   * @param client a BigQuery client
+   * @param options a PipelineOptions object
+   * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
+   */
+  BigQueryTableInserter(Bigquery client, PipelineOptions options,
+                               int maxRowsPerBatch) {
+    this.client = client;
+    this.maxRowsPerBatch = maxRowsPerBatch;
+    this.executor = options.as(GcsOptions.class).getExecutorService();
+  }
+
+  /**
+   * Insert all rows from the given list.
+   */
+  void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
+    insertAll(ref, rowList, null, null);
+  }
+
+  /**
+   * Insert all rows from the given list using specified insertIds if not null. Track count of
+   * bytes written with the Aggregator.
+   */
+  void insertAll(TableReference ref, List<TableRow> rowList,
+      @Nullable List<String> insertIdList, @Nullable Aggregator<Long, Long> byteCountAggregator)
+      throws IOException {
+    checkNotNull(ref, "ref");
+    if (insertIdList != null && rowList.size() != insertIdList.size()) {
+      throw new AssertionError("If insertIdList is not null it needs to have at least "
+          + "as many elements as rowList");
+    }
+
+    AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
+        MAX_INSERT_ATTEMPTS,
+        INITIAL_INSERT_BACKOFF_INTERVAL_MS);
+
+    List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
+    // These lists contain the rows to publish. Initially the contain the entire list. If there are
+    // failures, they will contain only the failed rows to be retried.
+    List<TableRow> rowsToPublish = rowList;
+    List<String> idsToPublish = insertIdList;
+    while (true) {
+      List<TableRow> retryRows = new ArrayList<>();
+      List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
+
+      int strideIndex = 0;
+      // Upload in batches.
+      List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
+      int dataSize = 0;
+
+      List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
+      List<Integer> strideIndices = new ArrayList<>();
+
+      for (int i = 0; i < rowsToPublish.size(); ++i) {
+        TableRow row = rowsToPublish.get(i);
+        TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+        if (idsToPublish != null) {
+          out.setInsertId(idsToPublish.get(i));
+        }
+        out.setJson(row.getUnknownKeys());
+        rows.add(out);
+
+        dataSize += row.toString().length();
+        if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
+            || i == rowsToPublish.size() - 1) {
+          TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+          content.setRows(rows);
+
+          final Bigquery.Tabledata.InsertAll insert = client.tabledata()
+              .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
+                  content);
+
+          futures.add(
+              executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
+                @Override
+                public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
+                  BackOff backoff = new IntervalBoundedExponentialBackOff(
+                      MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+                  while (true) {
+                    try {
+                      return insert.execute().getInsertErrors();
+                    } catch (IOException e) {
+                      if (new ApiErrorExtractor().rateLimited(e)) {
+                        LOG.info("BigQuery insertAll exceeded rate limit, retrying");
+                        try {
+                          Thread.sleep(backoff.nextBackOffMillis());
+                        } catch (InterruptedException interrupted) {
+                          throw new IOException(
+                              "Interrupted while waiting before retrying insertAll");
+                        }
+                      } else {
+                        throw e;
+                      }
+                    }
+                  }
+                }
+              }));
+          strideIndices.add(strideIndex);
+
+          if (byteCountAggregator != null) {
+            byteCountAggregator.addValue((long) dataSize);
+          }
+          dataSize = 0;
+          strideIndex = i + 1;
+          rows = new LinkedList<>();
+        }
+      }
+
+      try {
+        for (int i = 0; i < futures.size(); i++) {
+          List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
+          if (errors != null) {
+            for (TableDataInsertAllResponse.InsertErrors error : errors) {
+              allErrors.add(error);
+              if (error.getIndex() == null) {
+                throw new IOException("Insert failed: " + allErrors);
+              }
+
+              int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
+              retryRows.add(rowsToPublish.get(errorIndex));
+              if (retryIds != null) {
+                retryIds.add(idsToPublish.get(errorIndex));
+              }
+            }
+          }
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while inserting " + rowsToPublish);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e.getCause());
+      }
+
+      if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
+        try {
+          Thread.sleep(backoff.nextBackOffMillis());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
+        }
+        LOG.info("Retrying failed inserts to BigQuery");
+        rowsToPublish = retryRows;
+        idsToPublish = retryIds;
+        allErrors.clear();
+      } else {
+        break;
+      }
+    }
+    if (!allErrors.isEmpty()) {
+      throw new IOException("Insert failed: " + allErrors);
+    }
+  }
+
+  /**
+   * Retrieves or creates the table.
+   *
+   * <p>The table is checked to conform to insertion requirements as specified
+   * by WriteDisposition and CreateDisposition.
+   *
+   * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
+   * this will re-create the table if necessary to ensure it is empty.
+   *
+   * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
+   * will fail if the table exists and is not empty.
+   *
+   * <p>When constructing a table, a {@code TableSchema} must be available.  If a
+   * schema is provided, then it will be used.  If no schema is provided, but
+   * an existing table is being cleared (WRITE_TRUNCATE option above), then
+   * the existing schema will be re-used.  If no schema is available, then an
+   * {@code IOException} is thrown.
+   */
+  Table getOrCreateTable(
+      TableReference ref,
+      WriteDisposition writeDisposition,
+      CreateDisposition createDisposition,
+      @Nullable TableSchema schema) throws IOException {
+    // Check if table already exists.
+    Bigquery.Tables.Get get = client.tables()
+        .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+    Table table = null;
+    try {
+      table = get.execute();
+    } catch (IOException e) {
+      ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+      if (!errorExtractor.itemNotFound(e)
+          || createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
+        // Rethrow.
+        throw e;
+      }
+    }
+
+    // If we want an empty table, and it isn't, then delete it first.
+    if (table != null) {
+      if (writeDisposition == WriteDisposition.WRITE_APPEND) {
+        return table;
+      }
+
+      boolean empty = isEmpty(ref);
+      if (empty) {
+        if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
+          LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
+        }
+        return table;
+
+      } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
+        throw new IOException("WriteDisposition is WRITE_EMPTY, "
+            + "but table is not empty");
+      }
+
+      // Reuse the existing schema if none was provided.
+      if (schema == null) {
+        schema = table.getSchema();
+      }
+
+      // Delete table and fall through to re-creating it below.
+      LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
+      Bigquery.Tables.Delete delete = client.tables()
+          .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+      delete.execute();
+    }
+
+    if (schema == null) {
+      throw new IllegalArgumentException(
+          "Table schema required for new table.");
+    }
+
+    // Create the table.
+    return tryCreateTable(ref, schema);
+  }
+
+  /**
+   * Checks if a table is empty.
+   */
+  private boolean isEmpty(TableReference ref) throws IOException {
+    Bigquery.Tabledata.List list = client.tabledata()
+        .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+    list.setMaxResults(1L);
+    TableDataList dataList = list.execute();
+
+    return dataList.getRows() == null || dataList.getRows().isEmpty();
+  }
+
+  /**
+   * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
+   * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
+   * configured with a table spec function to use different tables for each window.
+   */
+  private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
+
+  /**
+   * Tries to create the BigQuery table.
+   * If a table with the same name already exists in the dataset, the table
+   * creation fails, and the function returns null.  In such a case,
+   * the existing table doesn't necessarily have the same schema as specified
+   * by the parameter.
+   *
+   * @param schema Schema of the new BigQuery table.
+   * @return The newly created BigQuery table information, or null if the table
+   *     with the same name already exists.
+   * @throws IOException if other error than already existing table occurs.
+   */
+  @Nullable
+  private Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
+    LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
+    BackOff backoff =
+        new ExponentialBackOff.Builder()
+            .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
+            .build();
+
+    Table table = new Table().setTableReference(ref).setSchema(schema);
+    return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
+  }
+
+  @VisibleForTesting
+  @Nullable
+  Table tryCreateTable(
+      Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
+          throws IOException {
+    boolean retry = false;
+    while (true) {
+      try {
+        return client.tables().insert(projectId, datasetId, table).execute();
+      } catch (IOException e) {
+        ApiErrorExtractor extractor = new ApiErrorExtractor();
+        if (extractor.itemAlreadyExists(e)) {
+          // The table already exists, nothing to return.
+          return null;
+        } else if (extractor.rateLimited(e)) {
+          // The request failed because we hit a temporary quota. Back off and try again.
+          try {
+            if (BackOffUtils.next(sleeper, backoff)) {
+              if (!retry) {
+                LOG.info(
+                    "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
+                    projectId,
+                    datasetId,
+                    table.getTableReference().getTableId(),
+                    TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
+                retry = true;
+              }
+              continue;
+            }
+          } catch (InterruptedException e1) {
+            // Restore interrupted state and throw the last failure.
+            Thread.currentThread().interrupt();
+            throw e;
+          }
+        }
+        throw e;
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
new file mode 100644
index 0000000..3afdffa
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ClassInfo;
+import com.google.api.client.util.Data;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * Iterates over all rows in a table.
+ */
+class BigQueryTableRowIterator implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
+
+  @Nullable private TableReference ref;
+  @Nullable private final String projectId;
+  @Nullable private TableSchema schema;
+  private final Bigquery client;
+  private String pageToken;
+  private Iterator<TableRow> iteratorOverCurrentBatch;
+  private TableRow current;
+  // Set true when the final page is seen from the service.
+  private boolean lastPage = false;
+
+  // The maximum number of times a BigQuery request will be retried
+  private static final int MAX_RETRIES = 3;
+  // Initial wait time for the backoff implementation
+  private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1);
+
+  // After sending a query to BQ service we will be polling the BQ service to check the status with
+  // following interval to check the status of query execution job
+  private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
+
+  private final String query;
+  // Whether to flatten query results.
+  private final boolean flattenResults;
+  // Temporary dataset used to store query results.
+  private String temporaryDatasetId = null;
+  // Temporary table used to store query results.
+  private String temporaryTableId = null;
+
+  private BigQueryTableRowIterator(
+      @Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
+      Bigquery client, boolean flattenResults) {
+    this.ref = ref;
+    this.query = query;
+    this.projectId = projectId;
+    this.client = checkNotNull(client, "client");
+    this.flattenResults = flattenResults;
+  }
+
+  /**
+   * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
+   */
+  public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
+    checkNotNull(ref, "ref");
+    checkNotNull(client, "client");
+    return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
+  }
+
+  /**
+   * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
+   * specified query in the specified project.
+   */
+  public static BigQueryTableRowIterator fromQuery(
+      String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
+    checkNotNull(query, "query");
+    checkNotNull(projectId, "projectId");
+    checkNotNull(client, "client");
+    return new BigQueryTableRowIterator(null, query, projectId, client,
+        MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
+  }
+
+  /**
+   * Opens the table for read.
+   * @throws IOException on failure
+   */
+  public void open() throws IOException, InterruptedException {
+    if (query != null) {
+      ref = executeQueryAndWaitForCompletion();
+    }
+    // Get table schema.
+    Bigquery.Tables.Get get =
+        client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+
+    Table table =
+        executeWithBackOff(
+            get,
+            "Error opening BigQuery table  %s of dataset %s  : {}",
+            ref.getTableId(),
+            ref.getDatasetId());
+    schema = table.getSchema();
+  }
+
+  public boolean advance() throws IOException, InterruptedException {
+    while (true) {
+      if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
+        // Embed schema information into the raw row, so that values have an
+        // associated key.  This matches how rows are read when using the
+        // DataflowRunner.
+        current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
+        return true;
+      }
+      if (lastPage) {
+        return false;
+      }
+
+      Bigquery.Tabledata.List list =
+          client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+      if (pageToken != null) {
+        list.setPageToken(pageToken);
+      }
+
+      TableDataList result =
+          executeWithBackOff(
+              list,
+              "Error reading from BigQuery table %s of dataset %s : {}",
+              ref.getTableId(),
+              ref.getDatasetId());
+
+      pageToken = result.getPageToken();
+      iteratorOverCurrentBatch =
+          result.getRows() != null
+              ? result.getRows().iterator()
+              : Collections.<TableRow>emptyIterator();
+
+      // The server may return a page token indefinitely on a zero-length table.
+      if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) {
+        lastPage = true;
+      }
+    }
+  }
+
+  public TableRow getCurrent() {
+    if (current == null) {
+      throw new NoSuchElementException();
+    }
+    return current;
+  }
+
+  /**
+   * Adjusts a field returned from the BigQuery API to match what we will receive when running
+   * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
+   * used for batch jobs executed on the Cloud Dataflow service.
+   *
+   * <p>The following is the relationship between BigQuery schema and Java types:
+   *
+   * <ul>
+   *   <li>Nulls are {@code null}.
+   *   <li>Repeated fields are {@code List} of objects.
+   *   <li>Record columns are {@link TableRow} objects.
+   *   <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
+   *   <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
+   *   <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
+   *       {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
+   *       zeros and can be 1 to 6 digits long.
+   *   <li>Every other atomic type is a {@code String}.
+   * </ul>
+   *
+   * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
+   *
+   * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
+   * and are not accessible through the {@link TableRow#getF} function.
+   */
+  @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
+    if (Data.isNull(v)) {
+      return null;
+    }
+
+    if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
+      TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
+      @SuppressWarnings("unchecked")
+      List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
+      ImmutableList.Builder<Object> values = ImmutableList.builder();
+      for (Map<String, Object> element : rawCells) {
+        values.add(getTypedCellValue(elementSchema, element.get("v")));
+      }
+      return values.build();
+    }
+
+    if (fieldSchema.getType().equals("RECORD")) {
+      @SuppressWarnings("unchecked")
+      Map<String, Object> typedV = (Map<String, Object>) v;
+      return getTypedTableRow(fieldSchema.getFields(), typedV);
+    }
+
+    if (fieldSchema.getType().equals("FLOAT")) {
+      return Double.parseDouble((String) v);
+    }
+
+    if (fieldSchema.getType().equals("BOOLEAN")) {
+      return Boolean.parseBoolean((String) v);
+    }
+
+    if (fieldSchema.getType().equals("TIMESTAMP")) {
+      return BigQueryAvroUtils.formatTimestamp((String) v);
+    }
+
+    return v;
+  }
+
+  /**
+   * A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
+   * because they are reserved keywords in {@link TableRow}.
+   */
+  // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
+  // not indirect through our broken use of {@link TableRow}.
+  //     See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
+  private static final Collection<String> RESERVED_FIELD_NAMES =
+      ClassInfo.of(TableRow.class).getNames();
+
+  /**
+   * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
+   * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
+   * the cells are converted to Java types according to the provided field schemas.
+   *
+   * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
+   * types are mapped to Java types.
+   */
+  private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
+    // If rawRow is a TableRow, use it. If not, create a new one.
+    TableRow row;
+    List<? extends Map<String, Object>> cells;
+    if (rawRow instanceof TableRow) {
+      // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
+      // any type conversion, but extract the cells for cell-wise processing below.
+      row = (TableRow) rawRow;
+      cells = row.getF();
+      // Clear the cells from the row, so that row.getF() will return null. This matches the
+      // behavior of rows produced by the BigQuery export API used on the service.
+      row.setF(null);
+    } else {
+      row = new TableRow();
+
+      // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
+      // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
+      // we will use Map.get("v") instead of TableCell.getV() get its value.
+      @SuppressWarnings("unchecked")
+      List<? extends Map<String, Object>> rawCells =
+          (List<? extends Map<String, Object>>) rawRow.get("f");
+      cells = rawCells;
+    }
+
+    checkState(cells.size() == fields.size(),
+        "Expected that the row has the same number of cells %s as fields in the schema %s",
+        cells.size(), fields.size());
+
+    // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
+    // and storing the normalized values by field name in the Map<String, Object> that
+    // underlies the TableRow.
+    Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
+    Iterator<TableFieldSchema> fieldIt = fields.iterator();
+    while (cellIt.hasNext()) {
+      Map<String, Object> cell = cellIt.next();
+      TableFieldSchema fieldSchema = fieldIt.next();
+
+      // Convert the object in this cell to the Java type corresponding to its type in the schema.
+      Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
+
+      String fieldName = fieldSchema.getName();
+      checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
+          "BigQueryIO does not support records with columns named %s", fieldName);
+
+      if (convertedValue == null) {
+        // BigQuery does not include null values when the export operation (to JSON) is used.
+        // To match that behavior, BigQueryTableRowiterator, and the DirectRunner,
+        // intentionally omits columns with null values.
+        continue;
+      }
+
+      row.set(fieldName, convertedValue);
+    }
+    return row;
+  }
+
+  // Create a new BigQuery dataset
+  private void createDataset(String datasetId) throws IOException, InterruptedException {
+    Dataset dataset = new Dataset();
+    DatasetReference reference = new DatasetReference();
+    reference.setProjectId(projectId);
+    reference.setDatasetId(datasetId);
+    dataset.setDatasetReference(reference);
+
+    String createDatasetError =
+        "Error when trying to create the temporary dataset " + datasetId + " in project "
+        + projectId;
+    executeWithBackOff(
+        client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
+  }
+
+  // Delete the given table that is available in the given dataset.
+  private void deleteTable(String datasetId, String tableId)
+      throws IOException, InterruptedException {
+    executeWithBackOff(
+        client.tables().delete(projectId, datasetId, tableId),
+        "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
+        + " of project " + projectId + ". Manual deletion may be required. Error message : {}");
+  }
+
+  // Delete the given dataset. This will fail if the given dataset has any tables.
+  private void deleteDataset(String datasetId) throws IOException, InterruptedException {
+    executeWithBackOff(
+        client.datasets().delete(projectId, datasetId),
+        "Error when trying to delete the temporary dataset " + datasetId + " in project "
+        + projectId + ". Manual deletion may be required. Error message : {}");
+  }
+
+  /**
+   * Executes the specified query and returns a reference to the temporary BigQuery table created
+   * to hold the results.
+   *
+   * @throws IOException if the query fails.
+   */
+  private TableReference executeQueryAndWaitForCompletion()
+      throws IOException, InterruptedException {
+    // Create a temporary dataset to store results.
+    // Starting dataset name with an "_" so that it is hidden.
+    Random rnd = new Random(System.currentTimeMillis());
+    temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
+    temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
+
+    createDataset(temporaryDatasetId);
+    Job job = new Job();
+    JobConfiguration config = new JobConfiguration();
+    JobConfigurationQuery queryConfig = new JobConfigurationQuery();
+    config.setQuery(queryConfig);
+    job.setConfiguration(config);
+    queryConfig.setQuery(query);
+    queryConfig.setAllowLargeResults(true);
+    queryConfig.setFlattenResults(flattenResults);
+
+    TableReference destinationTable = new TableReference();
+    destinationTable.setProjectId(projectId);
+    destinationTable.setDatasetId(temporaryDatasetId);
+    destinationTable.setTableId(temporaryTableId);
+    queryConfig.setDestinationTable(destinationTable);
+
+    Insert insert = client.jobs().insert(projectId, job);
+    Job queryJob = executeWithBackOff(
+        insert, "Error when trying to execute the job for query " + query + " :{}");
+    JobReference jobId = queryJob.getJobReference();
+
+    while (true) {
+      Job pollJob = executeWithBackOff(
+          client.jobs().get(projectId, jobId.getJobId()),
+          "Error when trying to get status of the job for query " + query + " :{}");
+      JobStatus status = pollJob.getStatus();
+      if (status.getState().equals("DONE")) {
+        // Job is DONE, but did not necessarily succeed.
+        ErrorProto error = status.getErrorResult();
+        if (error == null) {
+          return pollJob.getConfiguration().getQuery().getDestinationTable();
+        } else {
+          // There will be no temporary table to delete, so null out the reference.
+          temporaryTableId = null;
+          throw new IOException("Executing query " + query + " failed: " + error.getMessage());
+        }
+      }
+      Uninterruptibles.sleepUninterruptibly(
+          QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
+    }
+  }
+
+  // Execute a BQ request with exponential backoff and return the result.
+  // client - BQ request to be executed
+  // error - Formatted message to log if when a request fails. Takes exception message as a
+  // formatter parameter.
+  public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
+      Object... errorArgs) throws IOException, InterruptedException {
+    Sleeper sleeper = Sleeper.DEFAULT;
+    BackOff backOff =
+        new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
+
+    T result = null;
+    while (true) {
+      try {
+        result = client.execute();
+        break;
+      } catch (IOException e) {
+        LOG.error(String.format(error, errorArgs), e.getMessage());
+        if (!BackOffUtils.next(sleeper, backOff)) {
+          LOG.error(
+              String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
+          throw e;
+        }
+      }
+    }
+
+    return result;
+  }
+
+  @Override
+  public void close() {
+    // Prevent any further requests.
+    lastPage = true;
+
+    try {
+      // Deleting temporary table and dataset that gets generated when executing a query.
+      if (temporaryDatasetId != null) {
+        if (temporaryTableId != null) {
+          deleteTable(temporaryDatasetId, temporaryTableId);
+        }
+        deleteDataset(temporaryDatasetId);
+      }
+    } catch (IOException | InterruptedException e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException(e);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java
new file mode 100644
index 0000000..a6fc693
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines transforms for reading and writing from Google BigQuery.
+ *
+ * @see org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
new file mode 100644
index 0000000..316392f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.Lists;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link BigQueryAvroUtils}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryAvroUtilsTest {
+  @Test
+  public void testConvertGenericRecordToTableRow() throws Exception {
+    TableSchema tableSchema = new TableSchema();
+    List<TableFieldSchema> subFields = Lists.<TableFieldSchema>newArrayList(
+        new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
+    /*
+     * Note that the quality and quantity fields do not have their mode set, so they should default
+     * to NULLABLE. This is an important test of BigQuery semantics.
+     *
+     * All the other fields we set in this function are required on the Schema response.
+     *
+     * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
+     */
+    List<TableFieldSchema> fields =
+        Lists.<TableFieldSchema>newArrayList(
+            new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
+            new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
+            new TableFieldSchema().setName("quality").setType("FLOAT") /* default to NULLABLE */,
+            new TableFieldSchema().setName("quantity").setType("INTEGER") /* default to NULLABLE */,
+            new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
+            new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
+            new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
+                .setFields(subFields),
+            new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
+                .setFields(subFields));
+    tableSchema.setFields(fields);
+    Schema avroSchema = AvroCoder.of(Bird.class).getSchema();
+
+    {
+      // Test nullable fields.
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("number", 5L);
+      TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+      TableRow row = new TableRow()
+          .set("number", "5")
+          .set("associates", new ArrayList<TableRow>());
+      assertEquals(row, convertedRow);
+    }
+    {
+      // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, and FLOAT.
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("number", 5L);
+      record.put("quality", 5.0);
+      record.put("birthday", 5L);
+      record.put("flighted", Boolean.TRUE);
+      TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+      TableRow row = new TableRow()
+          .set("number", "5")
+          .set("birthday", "1970-01-01 00:00:00.000005 UTC")
+          .set("quality", 5.0)
+          .set("associates", new ArrayList<TableRow>())
+          .set("flighted", Boolean.TRUE);
+      assertEquals(row, convertedRow);
+    }
+    {
+      // Test repeated fields.
+      Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema();
+      GenericRecord nestedRecord = new GenericData.Record(subBirdSchema);
+      nestedRecord.put("species", "other");
+      GenericRecord record = new GenericData.Record(avroSchema);
+      record.put("number", 5L);
+      record.put("associates", Lists.<GenericRecord>newArrayList(nestedRecord));
+      TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+      TableRow row = new TableRow()
+          .set("associates", Lists.<TableRow>newArrayList(
+              new TableRow().set("species", "other")))
+          .set("number", "5");
+      assertEquals(row, convertedRow);
+    }
+  }
+
+  /**
+   * Pojo class used as the record type in tests.
+   */
+  @DefaultCoder(AvroCoder.class)
+  @SuppressWarnings("unused")  // Used by Avro reflection.
+  static class Bird {
+    long number;
+    @Nullable String species;
+    @Nullable Double quality;
+    @Nullable Long quantity;
+    @Nullable Long birthday;  // Exercises TIMESTAMP.
+    @Nullable Boolean flighted;
+    @Nullable SubBird scion;
+    SubBird[] associates;
+
+    static class SubBird {
+      @Nullable String species;
+
+      public SubBird() {}
+    }
+
+    public Bird() {
+      associates = new SubBird[1];
+      associates[0] = new SubBird();
+    }
+  }
+}