You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:02:57 UTC

[02/10] incubator-beam git commit: BigQueryIO: move to google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
new file mode 100644
index 0000000..00e7891
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -0,0 +1,1231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.fromJsonString;
+import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.toJsonString;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.hamcrest.Matchers.hasItem;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryQuerySource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.BigQueryTableSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.SourceTestUtils;
+import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.api.client.util.Data;
+import com.google.api.client.util.Strings;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatistics2;
+import com.google.api.services.bigquery.model.JobStatistics4;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+/**
+ * Tests for BigQueryIO.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIOTest implements Serializable {
+
+  // Status.UNKNOWN maps to null
+  private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
+      Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
+      Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto())));
+
+  private static class FakeBigQueryServices implements BigQueryServices {
+
+    private String[] jsonTableRowReturns = new String[0];
+    private JobService jobService;
+    private DatasetService datasetService;
+
+    public FakeBigQueryServices withJobService(JobService jobService) {
+      this.jobService = jobService;
+      return this;
+    }
+
+    public FakeBigQueryServices withDatasetService(DatasetService datasetService) {
+      this.datasetService = datasetService;
+      return this;
+    }
+
+    public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
+      this.jsonTableRowReturns = jsonTableRowReturns;
+      return this;
+    }
+
+    @Override
+    public JobService getJobService(BigQueryOptions bqOptions) {
+      return jobService;
+    }
+
+    @Override
+    public DatasetService getDatasetService(BigQueryOptions bqOptions) {
+      return datasetService;
+    }
+
+    @Override
+    public BigQueryJsonReader getReaderFromTable(
+        BigQueryOptions bqOptions, TableReference tableRef) {
+      return new FakeBigQueryReader(jsonTableRowReturns);
+    }
+
+    @Override
+    public BigQueryJsonReader getReaderFromQuery(
+        BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
+      return new FakeBigQueryReader(jsonTableRowReturns);
+    }
+
+    private static class FakeBigQueryReader implements BigQueryJsonReader {
+      private static final int UNSTARTED = -1;
+      private static final int CLOSED = Integer.MAX_VALUE;
+
+      private String[] jsonTableRowReturns;
+      private int currIndex;
+
+      FakeBigQueryReader(String[] jsonTableRowReturns) {
+        this.jsonTableRowReturns = jsonTableRowReturns;
+        this.currIndex = UNSTARTED;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        assertEquals(UNSTARTED, currIndex);
+        currIndex = 0;
+        return currIndex < jsonTableRowReturns.length;
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return ++currIndex < jsonTableRowReturns.length;
+      }
+
+      @Override
+      public TableRow getCurrent() throws NoSuchElementException {
+        if (currIndex >= jsonTableRowReturns.length) {
+          throw new NoSuchElementException();
+        }
+        return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
+      }
+
+      @Override
+      public void close() throws IOException {
+        currIndex = CLOSED;
+      }
+    }
+  }
+
+  private static class FakeJobService implements JobService, Serializable {
+
+    private Object[] startJobReturns;
+    private Object[] pollJobReturns;
+    private String executingProject;
+    // Both counts will be reset back to zeros after serialization.
+    // This is a work around for DoFn's verifyUnmodified check.
+    private transient int startJobCallsCount;
+    private transient int pollJobStatusCallsCount;
+
+    public FakeJobService() {
+      this.startJobReturns = new Object[0];
+      this.pollJobReturns = new Object[0];
+      this.startJobCallsCount = 0;
+      this.pollJobStatusCallsCount = 0;
+    }
+
+    /**
+     * Sets the return values to mock {@link JobService#startLoadJob},
+     * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}.
+     *
+     * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
+     */
+    public FakeJobService startJobReturns(Object... startJobReturns) {
+      this.startJobReturns = startJobReturns;
+      return this;
+    }
+
+    /**
+     * Sets the return values to mock {@link JobService#pollJob}.
+     *
+     * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
+     */
+    public FakeJobService pollJobReturns(Object... pollJobReturns) {
+      this.pollJobReturns = pollJobReturns;
+      return this;
+    }
+
+    /**
+     * Verifies executing project.
+     */
+    public FakeJobService verifyExecutingProject(String executingProject) {
+      this.executingProject = executingProject;
+      return this;
+    }
+
+    @Override
+    public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
+        throws InterruptedException, IOException {
+      startJob(jobRef);
+    }
+
+    @Override
+    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
+        throws InterruptedException, IOException {
+      startJob(jobRef);
+    }
+
+    @Override
+    public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
+        throws IOException, InterruptedException {
+      startJob(jobRef);
+    }
+
+    @Override
+    public Job pollJob(JobReference jobRef, int maxAttempts)
+        throws InterruptedException {
+      if (!Strings.isNullOrEmpty(executingProject)) {
+        checkArgument(
+            jobRef.getProjectId().equals(executingProject),
+            "Project id: %s is not equal to executing project: %s",
+            jobRef.getProjectId(), executingProject);
+      }
+
+      if (pollJobStatusCallsCount < pollJobReturns.length) {
+        Object ret = pollJobReturns[pollJobStatusCallsCount++];
+        if (ret instanceof Job) {
+          return (Job) ret;
+        } else if (ret instanceof Status) {
+          return JOB_STATUS_MAP.get(ret);
+        } else if (ret instanceof InterruptedException) {
+          throw (InterruptedException) ret;
+        } else {
+          throw new RuntimeException("Unexpected return type: " + ret.getClass());
+        }
+      } else {
+        throw new RuntimeException(
+            "Exceeded expected number of calls: " + pollJobReturns.length);
+      }
+    }
+
+    private void startJob(JobReference jobRef) throws IOException, InterruptedException {
+      if (!Strings.isNullOrEmpty(executingProject)) {
+        checkArgument(
+            jobRef.getProjectId().equals(executingProject),
+            "Project id: %s is not equal to executing project: %s",
+            jobRef.getProjectId(), executingProject);
+      }
+
+      if (startJobCallsCount < startJobReturns.length) {
+        Object ret = startJobReturns[startJobCallsCount++];
+        if (ret instanceof IOException) {
+          throw (IOException) ret;
+        } else if (ret instanceof InterruptedException) {
+          throw (InterruptedException) ret;
+        } else {
+          return;
+        }
+      } else {
+        throw new RuntimeException(
+            "Exceeded expected number of calls: " + startJobReturns.length);
+      }
+    }
+
+    @Override
+    public JobStatistics dryRunQuery(String projectId, String query)
+        throws InterruptedException, IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+  @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class);
+  @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
+  @Mock(extraInterfaces = Serializable.class)
+  public transient BigQueryServices.JobService mockJobService;
+  @Mock private transient IOChannelFactory mockIOChannelFactory;
+  @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
+
+  private transient BigQueryOptions bqOptions;
+
+  private void checkReadTableObject(
+      BigQueryIO.Read.Bound bound, String project, String dataset, String table) {
+    checkReadTableObjectWithValidate(bound, project, dataset, table, true);
+  }
+
+  private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
+    checkReadQueryObjectWithValidate(bound, query, true);
+  }
+
+  private void checkReadTableObjectWithValidate(
+      BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) {
+    assertEquals(project, bound.getTable().getProjectId());
+    assertEquals(dataset, bound.getTable().getDatasetId());
+    assertEquals(table, bound.getTable().getTableId());
+    assertNull(bound.query);
+    assertEquals(validate, bound.getValidate());
+  }
+
+  private void checkReadQueryObjectWithValidate(
+      BigQueryIO.Read.Bound bound, String query, boolean validate) {
+    assertNull(bound.getTable());
+    assertEquals(query, bound.query);
+    assertEquals(validate, bound.getValidate());
+  }
+
+  private void checkWriteObject(
+      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+      TableSchema schema, CreateDisposition createDisposition,
+      WriteDisposition writeDisposition) {
+    checkWriteObjectWithValidate(
+        bound, project, dataset, table, schema, createDisposition, writeDisposition, true);
+  }
+
+  private void checkWriteObjectWithValidate(
+      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
+      TableSchema schema, CreateDisposition createDisposition,
+      WriteDisposition writeDisposition, boolean validate) {
+    assertEquals(project, bound.getTable().getProjectId());
+    assertEquals(dataset, bound.getTable().getDatasetId());
+    assertEquals(table, bound.getTable().getTableId());
+    assertEquals(schema, bound.getSchema());
+    assertEquals(createDisposition, bound.createDisposition);
+    assertEquals(writeDisposition, bound.writeDisposition);
+    assertEquals(validate, bound.validate);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    bqOptions.setProject("defaultProject");
+    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
+
+    MockitoAnnotations.initMocks(this);
+  }
+
+  @Test
+  public void testBuildTableBasedSource() {
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+  }
+
+  @Test
+  public void testBuildQueryBasedSource() {
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
+    checkReadQueryObject(bound, "foo_query");
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithoutValidation() {
+    // This test just checks that using withoutValidation will not trigger object
+    // construction errors.
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
+    checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
+  }
+
+  @Test
+  public void testBuildQueryBasedSourceWithoutValidation() {
+    // This test just checks that using withoutValidation will not trigger object
+    // construction errors.
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.fromQuery("some_query").withoutValidation();
+    checkReadQueryObjectWithValidate(bound, "some_query", false);
+  }
+
+  @Test
+  public void testBuildTableBasedSourceWithDefaultProject() {
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.from("somedataset.sometable");
+    checkReadTableObject(bound, null, "somedataset", "sometable");
+  }
+
+  @Test
+  public void testBuildSourceWithTableReference() {
+    TableReference table = new TableReference()
+        .setProjectId("foo.com:project")
+        .setDatasetId("somedataset")
+        .setTableId("sometable");
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
+    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+  }
+
+  @Test
+  public void testValidateReadSetsDefaultProject() throws Exception {
+    String projectId = "someproject";
+    String datasetId = "somedataset";
+    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    options.setProject(projectId);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService);
+    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
+        new RuntimeException("Unable to confirm BigQuery dataset presence"));
+
+    Pipeline p = TestPipeline.create(options);
+
+    TableReference tableRef = new TableReference();
+    tableRef.setDatasetId(datasetId);
+    tableRef.setTableId("sometable");
+
+    thrown.expect(RuntimeException.class);
+    // Message will be one of following depending on the execution environment.
+    thrown.expectMessage(
+        Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
+            .or(Matchers.containsString("BigQuery dataset not found for table")));
+    p.apply(BigQueryIO.Read.from(tableRef)
+        .withTestServices(fakeBqServices));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithoutTableQueryOrValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation, either table reference or query has to be set");
+    p.apply(BigQueryIO.Read.withoutValidation());
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndQuery() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation. Specifies both a query and a table, only one of these"
+        + " should be provided");
+    p.apply("ReadMyTable",
+        BigQueryIO.Read
+            .from("foo.com:project:somedataset.sometable")
+            .fromQuery("query"));
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndFlatten() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not configurable");
+    p.apply("ReadMyTable",
+        BigQueryIO.Read
+            .from("foo.com:project:somedataset.sometable")
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSourceWithTableAndFlattenWithoutValidation() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "Invalid BigQuery read operation. Specifies a"
+              + " table with a result flattening preference, which is not configurable");
+    p.apply(
+        BigQueryIO.Read
+            .from("foo.com:project:somedataset.sometable")
+            .withoutValidation()
+            .withoutResultFlattening());
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testReadFromTable() {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done")
+            .pollJobReturns(Status.UNKNOWN)
+            .verifyExecutingProject(bqOptions.getProject()))
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", 1)),
+            toJsonString(new TableRow().set("name", "b").set("number", 2)),
+            toJsonString(new TableRow().set("name", "c").set("number", 3)));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    PCollection<String> output = p
+        .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
+            .withTestServices(fakeBqServices)
+            .withoutValidation())
+        .apply(ParDo.of(new DoFn<TableRow, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output((String) c.element().get("name"));
+          }
+        }));
+
+    PAssert.that(output)
+        .containsInAnyOrder(ImmutableList.of("a", "b", "c"));
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCustomSink() throws Exception {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done", "done")
+            .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(Create.of(
+        new TableRow().set("name", "a").set("number", 1),
+        new TableRow().set("name", "b").set("number", 2),
+        new TableRow().set("name", "c").set("number", 3))
+        .withCoder(TableRowJsonCoder.of()))
+    .apply(BigQueryIO.Write.to("dataset-id.table-id")
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withSchema(new TableSchema().setFields(
+            ImmutableList.of(
+                new TableFieldSchema().setName("name").setType("STRING"),
+                new TableFieldSchema().setName("number").setType("INTEGER"))))
+        .withTestServices(fakeBqServices)
+        .withoutValidation());
+    p.run();
+
+    logged.verifyInfo("Starting BigQuery load job");
+    logged.verifyInfo("Previous load jobs failed, retrying.");
+    File tempDir = new File(bqOptions.getTempLocation());
+    assertEquals(0, tempDir.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File pathname) {
+        return pathname.isFile();
+      }}).length);
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testCustomSinkUnknown() throws Exception {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(new FakeJobService()
+            .startJobReturns("done", "done")
+            .pollJobReturns(Status.FAILED, Status.UNKNOWN));
+
+    Pipeline p = TestPipeline.create(bqOptions);
+    p.apply(Create.of(
+        new TableRow().set("name", "a").set("number", 1),
+        new TableRow().set("name", "b").set("number", 2),
+        new TableRow().set("name", "c").set("number", 3))
+        .withCoder(TableRowJsonCoder.of()))
+    .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
+        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
+        .withTestServices(fakeBqServices)
+        .withoutValidation());
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("Failed to poll the load job status.");
+    p.run();
+
+    File tempDir = new File(bqOptions.getTempLocation());
+    assertEquals(0, tempDir.listFiles(new FileFilter() {
+      @Override
+      public boolean accept(File pathname) {
+        return pathname.isFile();
+      }}).length);
+  }
+
+  @Test
+  public void testBuildSourceDisplayData() {
+    String tableSpec = "project:dataset.tableid";
+
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .from(tableSpec)
+        .fromQuery("myQuery")
+        .withoutResultFlattening()
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("table", tableSpec));
+    assertThat(displayData, hasDisplayItem("query", "myQuery"));
+    assertThat(displayData, hasDisplayItem("flattenResults", false));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .from("project:dataset.tableId")
+        .withTestServices(new FakeBigQueryServices()
+            .withDatasetService(mockDatasetService)
+            .withJobService(mockJobService))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("BigQueryIO.Read should include the table spec in its primitive display data",
+        displayData, hasItem(hasDisplayItem("table")));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    BigQueryIO.Read.Bound read = BigQueryIO.Read
+        .fromQuery("foobar")
+        .withTestServices(new FakeBigQueryServices()
+            .withDatasetService(mockDatasetService)
+            .withJobService(mockJobService))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
+    assertThat("BigQueryIO.Read should include the query in its primitive display data",
+        displayData, hasItem(hasDisplayItem("query")));
+  }
+
+
+  @Test
+  public void testBuildSink() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  public void testBatchSinkPrimitiveDisplayData() throws IOException, InterruptedException {
+    testSinkPrimitiveDisplayData(/* streaming: */ false);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
+  public void testStreamingSinkPrimitiveDisplayData() throws IOException, InterruptedException {
+    testSinkPrimitiveDisplayData(/* streaming: */ true);
+  }
+
+  private void testSinkPrimitiveDisplayData(boolean streaming) throws IOException,
+      InterruptedException {
+    PipelineOptions options = TestPipeline.testingPipelineOptions();
+    options.as(StreamingOptions.class).setStreaming(streaming);
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
+
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to("project:dataset.table")
+        .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
+        .withTestServices(new FakeBigQueryServices()
+          .withDatasetService(mockDatasetService)
+          .withJobService(mockJobService))
+        .withoutValidation();
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
+        displayData, hasItem(hasDisplayItem("tableSpec")));
+
+    assertThat("BigQueryIO.Write should include the table schema in its primitive display data",
+        displayData, hasItem(hasDisplayItem("schema")));
+  }
+
+  @Test
+  public void testBuildSinkwithoutValidation() {
+    // This test just checks that using withoutValidation will not trigger object
+    // construction errors.
+    BigQueryIO.Write.Bound bound =
+        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
+    checkWriteObjectWithValidate(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false);
+  }
+
+  @Test
+  public void testBuildSinkDefaultProject() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
+    checkWriteObject(
+        bound, null, "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  public void testBuildSinkWithTableReference() {
+    TableReference table = new TableReference()
+        .setProjectId("foo.com:project")
+        .setDatasetId("somedataset")
+        .setTableId("sometable");
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testBuildSinkWithoutTable() {
+    Pipeline p = TestPipeline.create();
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("must set the table reference");
+    p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
+        .apply(BigQueryIO.Write.withoutValidation());
+  }
+
+  @Test
+  public void testBuildSinkWithSchema() {
+    TableSchema schema = new TableSchema();
+    BigQueryIO.Write.Bound bound =
+        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  public void testBuildSinkWithCreateDispositionNever() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+        .to("foo.com:project:somedataset.sometable")
+        .withCreateDisposition(CreateDisposition.CREATE_NEVER);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  public void testBuildSinkWithCreateDispositionIfNeeded() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+        .to("foo.com:project:somedataset.sometable")
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  public void testBuildSinkWithWriteDispositionTruncate() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+        .to("foo.com:project:somedataset.sometable")
+        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE);
+  }
+
+  @Test
+  public void testBuildSinkWithWriteDispositionAppend() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+        .to("foo.com:project:somedataset.sometable")
+        .withWriteDisposition(WriteDisposition.WRITE_APPEND);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND);
+  }
+
+  @Test
+  public void testBuildSinkWithWriteDispositionEmpty() {
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
+        .to("foo.com:project:somedataset.sometable")
+        .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
+    checkWriteObject(
+        bound, "foo.com:project", "somedataset", "sometable",
+        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
+  }
+
+  @Test
+  public void testBuildSinkDisplayData() {
+    String tableSpec = "project:dataset.table";
+    TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
+
+    BigQueryIO.Write.Bound write = BigQueryIO.Write
+        .to(tableSpec)
+        .withSchema(schema)
+        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("table"));
+    assertThat(displayData, hasDisplayItem("schema"));
+    assertThat(displayData,
+        hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString()));
+    assertThat(displayData,
+        hasDisplayItem("writeDisposition", WriteDisposition.WRITE_APPEND.toString()));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  private void testWriteValidatesDataset(boolean streaming) throws Exception {
+    String projectId = "someproject";
+    String datasetId = "somedataset";
+
+    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
+    options.setProject(projectId);
+    options.setStreaming(streaming);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService);
+    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
+        new RuntimeException("Unable to confirm BigQuery dataset presence"));
+
+    Pipeline p = TestPipeline.create(options);
+
+    TableReference tableRef = new TableReference();
+    tableRef.setDatasetId(datasetId);
+    tableRef.setTableId("sometable");
+
+    thrown.expect(RuntimeException.class);
+    // Message will be one of following depending on the execution environment.
+    thrown.expectMessage(
+        Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
+            .or(Matchers.containsString("BigQuery dataset not found for table")));
+    p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
+     .apply(BigQueryIO.Write
+         .to(tableRef)
+         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+         .withSchema(new TableSchema())
+         .withTestServices(fakeBqServices));
+  }
+
+  @Test
+  public void testWriteValidatesDatasetBatch() throws Exception {
+    testWriteValidatesDataset(false);
+  }
+
+  @Test
+  public void testWriteValidatesDatasetStreaming() throws Exception {
+    testWriteValidatesDataset(true);
+  }
+
+  @Test
+  public void testTableParsing() {
+    TableReference ref = BigQueryIO
+        .parseTableSpec("my-project:data_set.table_name");
+    Assert.assertEquals("my-project", ref.getProjectId());
+    Assert.assertEquals("data_set", ref.getDatasetId());
+    Assert.assertEquals("table_name", ref.getTableId());
+  }
+
+  @Test
+  public void testTableParsing_validPatterns() {
+    BigQueryIO.parseTableSpec("a123-456:foo_bar.d");
+    BigQueryIO.parseTableSpec("a12345:b.c");
+    BigQueryIO.parseTableSpec("b12345.c");
+  }
+
+  @Test
+  public void testTableParsing_noProjectId() {
+    TableReference ref = BigQueryIO
+        .parseTableSpec("data_set.table_name");
+    Assert.assertEquals(null, ref.getProjectId());
+    Assert.assertEquals("data_set", ref.getDatasetId());
+    Assert.assertEquals("table_name", ref.getTableId());
+  }
+
+  @Test
+  public void testTableParsingError() {
+    thrown.expect(IllegalArgumentException.class);
+    BigQueryIO.parseTableSpec("0123456:foo.bar");
+  }
+
+  @Test
+  public void testTableParsingError_2() {
+    thrown.expect(IllegalArgumentException.class);
+    BigQueryIO.parseTableSpec("myproject:.bar");
+  }
+
+  @Test
+  public void testTableParsingError_3() {
+    thrown.expect(IllegalArgumentException.class);
+    BigQueryIO.parseTableSpec(":a.b");
+  }
+
+  @Test
+  public void testTableParsingError_slash() {
+    thrown.expect(IllegalArgumentException.class);
+    BigQueryIO.parseTableSpec("a\\b12345:c.d");
+  }
+
+  // Test that BigQuery's special null placeholder objects can be encoded.
+  @Test
+  public void testCoder_nullCell() throws CoderException {
+    TableRow row = new TableRow();
+    row.set("temperature", Data.nullOf(Object.class));
+    row.set("max_temperature", Data.nullOf(Object.class));
+
+    byte[] bytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), row);
+
+    TableRow newRow = CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes);
+    byte[] newBytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), newRow);
+
+    Assert.assertArrayEquals(bytes, newBytes);
+  }
+
+  @Test
+  public void testBigQueryIOGetName() {
+    assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
+    assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
+  }
+
+  @Test
+  public void testWriteValidateFailsCreateNoSchema() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("no schema was provided");
+    TestPipeline.create()
+        .apply(Create.<TableRow>of())
+        .apply(BigQueryIO.Write
+            .to("dataset.table")
+            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
+  }
+
+  @Test
+  public void testWriteValidateFailsTableAndTableSpec() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("Cannot set both a table reference and a table function");
+    TestPipeline.create()
+        .apply(Create.<TableRow>of())
+        .apply(BigQueryIO.Write
+            .to("dataset.table")
+            .to(new SerializableFunction<BoundedWindow, String>() {
+              @Override
+              public String apply(BoundedWindow input) {
+                return null;
+              }
+            }));
+  }
+
+  @Test
+  public void testWriteValidateFailsNoTableAndNoTableSpec() {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
+    TestPipeline.create()
+        .apply(Create.<TableRow>of())
+        .apply("name", BigQueryIO.Write.withoutValidation());
+  }
+
+  @Test
+  public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
+    String extractDestinationDir = "mock://tempLocation";
+    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+        jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+  }
+
+  @Test
+  public void testBigQueryTableSourceInitSplit() throws Exception {
+    Job extractJob = new Job();
+    JobStatistics jobStats = new JobStatistics();
+    JobStatistics4 extractStats = new JobStatistics4();
+    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
+    jobStats.setExtract(extractStats);
+    extractJob.setStatus(new JobStatus())
+        .setStatistics(jobStats);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
+    String extractDestinationDir = "mock://tempLocation";
+    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
+        jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+        .thenReturn(extractJob);
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation("mock://tempLocation");
+
+    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    when(mockIOChannelFactory.resolve(anyString(), anyString()))
+        .thenReturn("mock://tempLocation/output");
+    when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
+    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+    assertEquals(1, sources.size());
+    BoundedSource<TableRow> actual = sources.get(0);
+    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+    Mockito.verify(mockJobService)
+        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
+  }
+
+  @Test
+  public void testBigQueryQuerySourceInitSplit() throws Exception {
+    TableReference dryRunTable = new TableReference();
+
+    Job queryJob = new Job();
+    JobStatistics queryJobStats = new JobStatistics();
+    JobStatistics2 queryStats = new JobStatistics2();
+    queryStats.setReferencedTables(ImmutableList.of(dryRunTable));
+    queryJobStats.setQuery(queryStats);
+    queryJob.setStatus(new JobStatus())
+        .setStatistics(queryJobStats);
+
+    Job extractJob = new Job();
+    JobStatistics extractJobStats = new JobStatistics();
+    JobStatistics4 extractStats = new JobStatistics4();
+    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
+    extractJobStats.setExtract(extractStats);
+    extractJob.setStatus(new JobStatus())
+        .setStatistics(extractJobStats);
+
+    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
+        .withJobService(mockJobService)
+        .withDatasetService(mockDatasetService)
+        .readerReturns(
+            toJsonString(new TableRow().set("name", "a").set("number", "1")),
+            toJsonString(new TableRow().set("name", "b").set("number", "2")),
+            toJsonString(new TableRow().set("name", "c").set("number", "3")));
+
+    String jobIdToken = "testJobIdToken";
+    String extractDestinationDir = "mock://tempLocation";
+    TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
+    BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
+        jobIdToken, "query", destinationTable, true /* flattenResults */,
+        extractDestinationDir, fakeBqServices);
+
+    List<TableRow> expected = ImmutableList.of(
+        new TableRow().set("name", "a").set("number", "1"),
+        new TableRow().set("name", "b").set("number", "2"),
+        new TableRow().set("name", "c").set("number", "3"));
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.setTempLocation(extractDestinationDir);
+
+    TableReference queryTable = new TableReference()
+        .setProjectId("testProejct")
+        .setDatasetId("testDataset")
+        .setTableId("testTable");
+    when(mockJobService.dryRunQuery(anyString(), anyString()))
+        .thenReturn(new JobStatistics().setQuery(
+            new JobStatistics2()
+                .setTotalBytesProcessed(100L)
+                .setReferencedTables(ImmutableList.of(queryTable))));
+    when(mockDatasetService.getTable(
+        eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId())))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+    when(mockDatasetService.getTable(
+        eq(destinationTable.getProjectId()),
+        eq(destinationTable.getDatasetId()),
+        eq(destinationTable.getTableId())))
+        .thenReturn(new Table().setSchema(new TableSchema()));
+    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
+    when(mockIOChannelFactory.resolve(anyString(), anyString()))
+        .thenReturn("mock://tempLocation/output");
+    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
+        .thenReturn(extractJob);
+
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(bqSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
+
+    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
+    assertEquals(1, sources.size());
+    BoundedSource<TableRow> actual = sources.get(0);
+    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
+
+    Mockito.verify(mockJobService)
+        .startQueryJob(
+            Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
+    Mockito.verify(mockJobService)
+        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
+    Mockito.verify(mockDatasetService)
+        .createDataset(anyString(), anyString(), anyString(), anyString());
+  }
+
+  @Test
+  public void testTransformingSource() throws Exception {
+    int numElements = 10000;
+    @SuppressWarnings("deprecation")
+    BoundedSource<Long> longSource = CountingSource.upTo(numElements);
+    SerializableFunction<Long, String> toStringFn =
+        new SerializableFunction<Long, String>() {
+          @Override
+          public String apply(Long input) {
+            return input.toString();
+         }};
+    BoundedSource<String> stringSource = new TransformingSource<>(
+        longSource, toStringFn, StringUtf8Coder.of());
+
+    List<String> expected = Lists.newArrayList();
+    for (int i = 0; i < numElements; i++) {
+      expected.add(String.valueOf(i));
+    }
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    Assert.assertThat(
+        SourceTestUtils.readFromSource(stringSource, options),
+        CoreMatchers.is(expected));
+    SourceTestUtils.assertSplitAtFractionBehavior(
+        stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
+
+    SourceTestUtils.assertSourcesEqualReferenceSource(
+        stringSource, stringSource.splitIntoBundles(100, options), options);
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testPassThroughThenCleanup() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    PCollection<Integer> output = p
+        .apply(Create.of(1, 2, 3))
+        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
+          @Override
+          void cleanup(PipelineOptions options) throws Exception {
+            // no-op
+          }}));
+
+    PAssert.that(output).containsInAnyOrder(1, 2, 3);
+
+    p.run();
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testPassThroughThenCleanupExecuted() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    p.apply(Create.<Integer>of())
+        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
+          @Override
+          void cleanup(PipelineOptions options) throws Exception {
+            throw new RuntimeException("cleanup executed");
+          }}));
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("cleanup executed");
+
+    p.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
new file mode 100644
index 0000000..2cdf511
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl.JobServiceImpl;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Tests for {@link BigQueryServicesImpl}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryServicesImplTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class);
+  @Mock private LowLevelHttpResponse response;
+  private Bigquery bigquery;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    // A mock transport that lets us mock the API responses.
+    MockHttpTransport transport =
+        new MockHttpTransport.Builder()
+            .setLowLevelHttpRequest(
+                new MockLowLevelHttpRequest() {
+                  @Override
+                  public LowLevelHttpResponse execute() throws IOException {
+                    return response;
+                  }
+                })
+            .build();
+
+    // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
+    bigquery =
+        new Bigquery.Builder(
+                transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
+            .build();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds.
+   */
+  @Test
+  public void testStartLoadJobSucceeds() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    JobReference jobRef = new JobReference();
+    jobRef.setJobId("jobId");
+    jobRef.setProjectId("projectId");
+    testJob.setJobReference(jobRef);
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testJob));
+
+    Sleeper sleeper = new FastNanoClockAndSleeper();
+    BackOff backoff = new AttemptBoundedExponentialBackOff(
+        5 /* attempts */, 1000 /* initialIntervalMillis */);
+    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds
+   * with an already exist job.
+   */
+  @Test
+  public void testStartLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    JobReference jobRef = new JobReference();
+    jobRef.setJobId("jobId");
+    jobRef.setProjectId("projectId");
+    testJob.setJobReference(jobRef);
+
+    when(response.getStatusCode()).thenReturn(409); // 409 means already exists
+
+    Sleeper sleeper = new FastNanoClockAndSleeper();
+    BackOff backoff = new AttemptBoundedExponentialBackOff(
+        5 /* attempts */, 1000 /* initialIntervalMillis */);
+    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds with a retry.
+   */
+  @Test
+  public void testStartLoadJobRetry() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    JobReference jobRef = new JobReference();
+    jobRef.setJobId("jobId");
+    jobRef.setProjectId("projectId");
+    testJob.setJobReference(jobRef);
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(testJob));
+
+    Sleeper sleeper = new FastNanoClockAndSleeper();
+    BackOff backoff = new AttemptBoundedExponentialBackOff(
+        5 /* attempts */, 1000 /* initialIntervalMillis */);
+    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
+
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} succeeds.
+   */
+  @Test
+  public void testPollJobSucceeds() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    testJob.setStatus(new JobStatus().setState("DONE"));
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testJob));
+
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+
+    assertEquals(testJob, job);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} fails.
+   */
+  @Test
+  public void testPollJobFailed() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto()));
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testJob));
+
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
+
+    assertEquals(testJob, job);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} returns UNKNOWN.
+   */
+  @Test
+  public void testPollJobUnknown() throws IOException, InterruptedException {
+    Job testJob = new Job();
+    testJob.setStatus(new JobStatus());
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testJob));
+
+    BigQueryServicesImpl.JobServiceImpl jobService =
+        new BigQueryServicesImpl.JobServiceImpl(bigquery);
+    JobReference jobRef = new JobReference()
+        .setProjectId("projectId")
+        .setJobId("jobId");
+    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
+
+    assertEquals(null, job);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  @Test
+  public void testExecuteWithRetries() throws IOException, InterruptedException {
+    Table testTable = new Table();
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testTable));
+
+    Table table = BigQueryServicesImpl.executeWithRetries(
+        bigquery.tables().get("projectId", "datasetId", "tableId"),
+        "Failed to get table.",
+        Sleeper.DEFAULT,
+        BackOff.STOP_BACKOFF);
+
+    assertEquals(testTable, table);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /** A helper to wrap a {@link GenericJson} object in a content stream. */
+  private static InputStream toStream(GenericJson content) throws IOException {
+    return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
+  }
+
+  /** A helper that generates the error JSON payload that Google APIs produce. */
+  private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
+    ErrorInfo info = new ErrorInfo();
+    info.setReason(reason);
+    info.setDomain("global");
+    // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
+    GoogleJsonError error = new GoogleJsonError();
+    error.setErrors(ImmutableList.of(info));
+    error.setCode(status);
+    // The actual JSON response is an error container.
+    GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
+    container.setError(error);
+    return container;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
new file mode 100644
index 0000000..c29da91
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserterTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Verify.verifyNotNull;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.ExpectedLogs;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.util.Transport;
+
+import com.google.api.client.googleapis.json.GoogleJsonError;
+import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
+import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.http.LowLevelHttpResponse;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.Json;
+import com.google.api.client.json.jackson2.JacksonFactory;
+import com.google.api.client.testing.http.MockHttpTransport;
+import com.google.api.client.testing.http.MockLowLevelHttpRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.hadoop.util.RetryBoundedBackOff;
+import com.google.common.collect.ImmutableList;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests of {@link BigQueryTableInserter}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTableInserterTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
+  @Mock private LowLevelHttpResponse response;
+  private Bigquery bigquery;
+  private PipelineOptions options;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    // A mock transport that lets us mock the API responses.
+    MockHttpTransport transport =
+        new MockHttpTransport.Builder()
+            .setLowLevelHttpRequest(
+                new MockLowLevelHttpRequest() {
+                  @Override
+                  public LowLevelHttpResponse execute() throws IOException {
+                    return response;
+                  }
+                })
+            .build();
+
+    // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
+    bigquery =
+        new Bigquery.Builder(
+                transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
+            .build();
+
+    options = PipelineOptionsFactory.create();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    // These three interactions happen for every request in the normal response parsing.
+    verify(response, atLeastOnce()).getContentEncoding();
+    verify(response, atLeastOnce()).getHeaderCount();
+    verify(response, atLeastOnce()).getReasonPhrase();
+    verifyNoMoreInteractions(response);
+  }
+
+  /** A helper to wrap a {@link GenericJson} object in a content stream. */
+  private static InputStream toStream(GenericJson content) throws IOException {
+    return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
+  }
+
+  /** A helper that generates the error JSON payload that Google APIs produce. */
+  private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
+    ErrorInfo info = new ErrorInfo();
+    info.setReason(reason);
+    info.setDomain("global");
+    // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
+    GoogleJsonError error = new GoogleJsonError();
+    error.setErrors(ImmutableList.of(info));
+    error.setCode(status);
+    // The actual JSON response is an error container.
+    GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
+    container.setError(error);
+    return container;
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter} succeeds on the first try.
+   */
+  @Test
+  public void testCreateTableSucceeds() throws IOException {
+    Table testTable = new Table().setDescription("a table");
+
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(200);
+    when(response.getContent()).thenReturn(toStream(testTable));
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
+    Table ret =
+        inserter.tryCreateTable(
+            new Table(),
+            "project",
+            "dataset",
+            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
+            Sleeper.DEFAULT);
+    assertEquals(testTable, ret);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter} succeeds when the table already exists.
+   */
+  @Test
+  public void testCreateTableSucceedsAlreadyExists() throws IOException {
+    when(response.getStatusCode()).thenReturn(409); // 409 means already exists
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
+    Table ret =
+        inserter.tryCreateTable(
+            new Table(),
+            "project",
+            "dataset",
+            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
+            Sleeper.DEFAULT);
+
+    assertNull(ret);
+    verify(response, times(1)).getStatusCode();
+    verify(response, times(1)).getContent();
+    verify(response, times(1)).getContentType();
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter} retries quota rate limited attempts.
+   */
+  @Test
+  public void testCreateTableRetry() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    Table testTable = new Table().setTableReference(ref);
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(testTable));
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
+    Table ret =
+        inserter.tryCreateTable(
+            testTable,
+            "project",
+            "dataset",
+            new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
+            Sleeper.DEFAULT);
+    assertEquals(testTable, ret);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    verifyNotNull(ret.getTableReference());
+    expectedLogs.verifyInfo(
+        "Quota limit reached when creating table project:dataset.table, "
+            + "retrying up to 5.0 minutes");
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter} does not retry non-rate-limited attempts.
+   */
+  @Test
+  public void testCreateTableDoesNotRetry() throws IOException {
+    Table testTable = new Table().setDescription("a table");
+
+    // First response is 403 not-rate-limited, second response has valid payload but should not
+    // be invoked.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+        .thenReturn(toStream(testTable));
+
+    thrown.expect(GoogleJsonResponseException.class);
+    thrown.expectMessage("actually forbidden");
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
+    try {
+      inserter.tryCreateTable(
+          new Table(),
+          "project",
+          "dataset",
+          new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
+          Sleeper.DEFAULT);
+      fail();
+    } catch (IOException e) {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+      throw e;
+    }
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts.
+   */
+  @Test
+  public void testInsertRetry() throws IOException {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = new ArrayList<>();
+    rows.add(new TableRow());
+
+    // First response is 403 rate limited, second response has valid payload.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
+
+    inserter.insertAll(ref, rows);
+    verify(response, times(2)).getStatusCode();
+    verify(response, times(2)).getContent();
+    verify(response, times(2)).getContentType();
+    expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
+  }
+
+  /**
+   * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
+   */
+  @Test
+  public void testInsertDoesNotRetry() throws Throwable {
+    TableReference ref =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    List<TableRow> rows = new ArrayList<>();
+    rows.add(new TableRow());
+
+    // First response is 403 not-rate-limited, second response has valid payload but should not
+    // be invoked.
+    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
+    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
+    when(response.getContent())
+        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
+        .thenReturn(toStream(new TableDataInsertAllResponse()));
+
+    thrown.expect(GoogleJsonResponseException.class);
+    thrown.expectMessage("actually forbidden");
+
+    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
+
+    try {
+      inserter.insertAll(ref, rows);
+      fail();
+    } catch (RuntimeException e) {
+      verify(response, times(1)).getStatusCode();
+      verify(response, times(1)).getContent();
+      verify(response, times(1)).getContentType();
+      throw e.getCause();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
new file mode 100644
index 0000000..457b071
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIteratorTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Tests for {@link BigQueryTableRowIterator}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryTableRowIteratorTest {
+
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Mock private Bigquery mockClient;
+  @Mock private Bigquery.Datasets mockDatasets;
+  @Mock private Bigquery.Datasets.Delete mockDatasetsDelete;
+  @Mock private Bigquery.Datasets.Insert mockDatasetsInsert;
+  @Mock private Bigquery.Jobs mockJobs;
+  @Mock private Bigquery.Jobs.Get mockJobsGet;
+  @Mock private Bigquery.Jobs.Insert mockJobsInsert;
+  @Mock private Bigquery.Tables mockTables;
+  @Mock private Bigquery.Tables.Get mockTablesGet;
+  @Mock private Bigquery.Tables.Delete mockTablesDelete;
+  @Mock private Bigquery.Tabledata mockTabledata;
+  @Mock private Bigquery.Tabledata.List mockTabledataList;
+
+  @Before
+  public void setUp() throws IOException {
+    MockitoAnnotations.initMocks(this);
+    when(mockClient.tabledata()).thenReturn(mockTabledata);
+    when(mockTabledata.list(anyString(), anyString(), anyString())).thenReturn(mockTabledataList);
+
+    when(mockClient.tables()).thenReturn(mockTables);
+    when(mockTables.delete(anyString(), anyString(), anyString())).thenReturn(mockTablesDelete);
+    when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
+
+    when(mockClient.datasets()).thenReturn(mockDatasets);
+    when(mockDatasets.delete(anyString(), anyString())).thenReturn(mockDatasetsDelete);
+    when(mockDatasets.insert(anyString(), any(Dataset.class))).thenReturn(mockDatasetsInsert);
+
+    when(mockClient.jobs()).thenReturn(mockJobs);
+    when(mockJobs.insert(anyString(), any(Job.class))).thenReturn(mockJobsInsert);
+    when(mockJobs.get(anyString(), anyString())).thenReturn(mockJobsGet);
+  }
+
+  @After
+  public void tearDown() {
+    verifyNoMoreInteractions(mockClient);
+    verifyNoMoreInteractions(mockDatasets);
+    verifyNoMoreInteractions(mockDatasetsDelete);
+    verifyNoMoreInteractions(mockDatasetsInsert);
+    verifyNoMoreInteractions(mockJobs);
+    verifyNoMoreInteractions(mockJobsGet);
+    verifyNoMoreInteractions(mockJobsInsert);
+    verifyNoMoreInteractions(mockTables);
+    verifyNoMoreInteractions(mockTablesDelete);
+    verifyNoMoreInteractions(mockTablesGet);
+    verifyNoMoreInteractions(mockTabledata);
+    verifyNoMoreInteractions(mockTabledataList);
+  }
+
+  private static Table tableWithBasicSchema() {
+    return new Table()
+        .setSchema(
+            new TableSchema()
+                .setFields(
+                    Arrays.asList(
+                        new TableFieldSchema().setName("name").setType("STRING"),
+                        new TableFieldSchema().setName("answer").setType("INTEGER"))));
+  }
+
+  private TableRow rawRow(Object... args) {
+    List<TableCell> cells = new LinkedList<>();
+    for (Object a : args) {
+      cells.add(new TableCell().setV(a));
+    }
+    return new TableRow().setF(cells);
+  }
+
+  private TableDataList rawDataList(TableRow... rows) {
+    return new TableDataList().setRows(Arrays.asList(rows));
+  }
+
+  /**
+   * Verifies that when the query runs, the correct data is returned and the temporary dataset and
+   * table are both cleaned up.
+   */
+  @Test
+  public void testReadFromQuery() throws IOException, InterruptedException {
+    // Mock job inserting.
+    Job insertedJob = new Job().setJobReference(new JobReference());
+    when(mockJobsInsert.execute()).thenReturn(insertedJob);
+
+    // Mock job polling.
+    JobStatus status = new JobStatus().setState("DONE");
+    TableReference tableRef =
+        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
+    JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef);
+    Job getJob =
+        new Job()
+            .setJobReference(new JobReference())
+            .setStatus(status)
+            .setConfiguration(new JobConfiguration().setQuery(queryConfig));
+    when(mockJobsGet.execute()).thenReturn(getJob);
+
+    // Mock table schema fetch.
+    when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
+
+    // Mock table data fetch.
+    when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 42)));
+
+    // Run query and verify
+    String query = "SELECT name, count from table";
+    try (BigQueryTableRowIterator iterator =
+            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
+      iterator.open();
+      assertTrue(iterator.advance());
+      TableRow row = iterator.getCurrent();
+
+      assertTrue(row.containsKey("name"));
+      assertTrue(row.containsKey("answer"));
+      assertEquals("Arthur", row.get("name"));
+      assertEquals(42, row.get("answer"));
+
+      assertFalse(iterator.advance());
+    }
+
+    // Temp dataset created and later deleted.
+    verify(mockClient, times(2)).datasets();
+    verify(mockDatasets).insert(anyString(), any(Dataset.class));
+    verify(mockDatasetsInsert).execute();
+    verify(mockDatasets).delete(anyString(), anyString());
+    verify(mockDatasetsDelete).execute();
+    // Job inserted to run the query, polled once.
+    verify(mockClient, times(2)).jobs();
+    verify(mockJobs).insert(anyString(), any(Job.class));
+    verify(mockJobsInsert).execute();
+    verify(mockJobs).get(anyString(), anyString());
+    verify(mockJobsGet).execute();
+    // Temp table get after query finish, deleted after reading.
+    verify(mockClient, times(2)).tables();
+    verify(mockTables).get("project", "dataset", "table");
+    verify(mockTablesGet).execute();
+    verify(mockTables).delete(anyString(), anyString(), anyString());
+    verify(mockTablesDelete).execute();
+    // Table data read.
+    verify(mockClient).tabledata();
+    verify(mockTabledata).list("project", "dataset", "table");
+    verify(mockTabledataList).execute();
+  }
+
+  /**
+   * Verifies that when the query fails, the user gets a useful exception and the temporary dataset
+   * is cleaned up. Also verifies that the temporary table (which is never created) is not
+   * erroneously attempted to be deleted.
+   */
+  @Test
+  public void testQueryFailed() throws IOException {
+    // Job can be created.
+    JobReference ref = new JobReference();
+    Job insertedJob = new Job().setJobReference(ref);
+    when(mockJobsInsert.execute()).thenReturn(insertedJob);
+
+    // Job state polled with an error.
+    String errorReason = "bad query";
+    JobStatus status =
+        new JobStatus().setState("DONE").setErrorResult(new ErrorProto().setMessage(errorReason));
+    Job getJob = new Job().setJobReference(ref).setStatus(status);
+    when(mockJobsGet.execute()).thenReturn(getJob);
+
+    String query = "NOT A QUERY";
+    try (BigQueryTableRowIterator iterator =
+            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
+      try {
+        iterator.open();
+        fail();
+      } catch (Exception expected) {
+        // Verify message explains cause and reports the query.
+        assertThat(expected.getMessage(), containsString("failed"));
+        assertThat(expected.getMessage(), containsString(errorReason));
+        assertThat(expected.getMessage(), containsString(query));
+      }
+    }
+
+    // Temp dataset created and then later deleted.
+    verify(mockClient, times(2)).datasets();
+    verify(mockDatasets).insert(anyString(), any(Dataset.class));
+    verify(mockDatasetsInsert).execute();
+    verify(mockDatasets).delete(anyString(), anyString());
+    verify(mockDatasetsDelete).execute();
+    // Job inserted to run the query, then polled once.
+    verify(mockClient, times(2)).jobs();
+    verify(mockJobs).insert(anyString(), any(Job.class));
+    verify(mockJobsInsert).execute();
+    verify(mockJobs).get(anyString(), anyString());
+    verify(mockJobsGet).execute();
+  }
+}