You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:03:01 UTC

[06/10] incubator-beam git commit: BigQueryIO: move to google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
deleted file mode 100644
index 0d1a9f8..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ /dev/null
@@ -1,1231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString;
-import static org.apache.beam.sdk.io.BigQueryIO.toJsonString;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.TableRowJsonCoder;
-import org.apache.beam.sdk.io.BigQueryIO.BigQueryQuerySource;
-import org.apache.beam.sdk.io.BigQueryIO.BigQueryTableSource;
-import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup;
-import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation;
-import org.apache.beam.sdk.io.BigQueryIO.Status;
-import org.apache.beam.sdk.io.BigQueryIO.TransformingSource;
-import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition;
-import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
-import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.testing.SourceTestUtils;
-import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.BigQueryServices;
-import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
-import org.apache.beam.sdk.util.BigQueryServices.JobService;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.IOChannelFactory;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.api.client.util.Data;
-import com.google.api.client.util.Strings;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobStatistics2;
-import com.google.api.services.bigquery.model.JobStatistics4;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-
-import org.hamcrest.CoreMatchers;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.MockitoAnnotations;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-/**
- * Tests for BigQueryIO.
- */
-@RunWith(JUnit4.class)
-public class BigQueryIOTest implements Serializable {
-
-  // Status.UNKNOWN maps to null
-  private static final Map<Status, Job> JOB_STATUS_MAP = ImmutableMap.of(
-      Status.SUCCEEDED, new Job().setStatus(new JobStatus()),
-      Status.FAILED, new Job().setStatus(new JobStatus().setErrorResult(new ErrorProto())));
-
-  private static class FakeBigQueryServices implements BigQueryServices {
-
-    private String[] jsonTableRowReturns = new String[0];
-    private JobService jobService;
-    private DatasetService datasetService;
-
-    public FakeBigQueryServices withJobService(JobService jobService) {
-      this.jobService = jobService;
-      return this;
-    }
-
-    public FakeBigQueryServices withDatasetService(DatasetService datasetService) {
-      this.datasetService = datasetService;
-      return this;
-    }
-
-    public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) {
-      this.jsonTableRowReturns = jsonTableRowReturns;
-      return this;
-    }
-
-    @Override
-    public JobService getJobService(BigQueryOptions bqOptions) {
-      return jobService;
-    }
-
-    @Override
-    public DatasetService getDatasetService(BigQueryOptions bqOptions) {
-      return datasetService;
-    }
-
-    @Override
-    public BigQueryJsonReader getReaderFromTable(
-        BigQueryOptions bqOptions, TableReference tableRef) {
-      return new FakeBigQueryReader(jsonTableRowReturns);
-    }
-
-    @Override
-    public BigQueryJsonReader getReaderFromQuery(
-        BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
-      return new FakeBigQueryReader(jsonTableRowReturns);
-    }
-
-    private static class FakeBigQueryReader implements BigQueryJsonReader {
-      private static final int UNSTARTED = -1;
-      private static final int CLOSED = Integer.MAX_VALUE;
-
-      private String[] jsonTableRowReturns;
-      private int currIndex;
-
-      FakeBigQueryReader(String[] jsonTableRowReturns) {
-        this.jsonTableRowReturns = jsonTableRowReturns;
-        this.currIndex = UNSTARTED;
-      }
-
-      @Override
-      public boolean start() throws IOException {
-        assertEquals(UNSTARTED, currIndex);
-        currIndex = 0;
-        return currIndex < jsonTableRowReturns.length;
-      }
-
-      @Override
-      public boolean advance() throws IOException {
-        return ++currIndex < jsonTableRowReturns.length;
-      }
-
-      @Override
-      public TableRow getCurrent() throws NoSuchElementException {
-        if (currIndex >= jsonTableRowReturns.length) {
-          throw new NoSuchElementException();
-        }
-        return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class);
-      }
-
-      @Override
-      public void close() throws IOException {
-        currIndex = CLOSED;
-      }
-    }
-  }
-
-  private static class FakeJobService implements JobService, Serializable {
-
-    private Object[] startJobReturns;
-    private Object[] pollJobReturns;
-    private String executingProject;
-    // Both counts will be reset back to zeros after serialization.
-    // This is a work around for DoFn's verifyUnmodified check.
-    private transient int startJobCallsCount;
-    private transient int pollJobStatusCallsCount;
-
-    public FakeJobService() {
-      this.startJobReturns = new Object[0];
-      this.pollJobReturns = new Object[0];
-      this.startJobCallsCount = 0;
-      this.pollJobStatusCallsCount = 0;
-    }
-
-    /**
-     * Sets the return values to mock {@link JobService#startLoadJob},
-     * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}.
-     *
-     * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
-     */
-    public FakeJobService startJobReturns(Object... startJobReturns) {
-      this.startJobReturns = startJobReturns;
-      return this;
-    }
-
-    /**
-     * Sets the return values to mock {@link JobService#pollJob}.
-     *
-     * <p>Throws if the {@link Object} is a {@link Exception}, returns otherwise.
-     */
-    public FakeJobService pollJobReturns(Object... pollJobReturns) {
-      this.pollJobReturns = pollJobReturns;
-      return this;
-    }
-
-    /**
-     * Verifies executing project.
-     */
-    public FakeJobService verifyExecutingProject(String executingProject) {
-      this.executingProject = executingProject;
-      return this;
-    }
-
-    @Override
-    public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
-        throws InterruptedException, IOException {
-      startJob(jobRef);
-    }
-
-    @Override
-    public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
-        throws InterruptedException, IOException {
-      startJob(jobRef);
-    }
-
-    @Override
-    public void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
-        throws IOException, InterruptedException {
-      startJob(jobRef);
-    }
-
-    @Override
-    public Job pollJob(JobReference jobRef, int maxAttempts)
-        throws InterruptedException {
-      if (!Strings.isNullOrEmpty(executingProject)) {
-        checkArgument(
-            jobRef.getProjectId().equals(executingProject),
-            "Project id: %s is not equal to executing project: %s",
-            jobRef.getProjectId(), executingProject);
-      }
-
-      if (pollJobStatusCallsCount < pollJobReturns.length) {
-        Object ret = pollJobReturns[pollJobStatusCallsCount++];
-        if (ret instanceof Job) {
-          return (Job) ret;
-        } else if (ret instanceof Status) {
-          return JOB_STATUS_MAP.get(ret);
-        } else if (ret instanceof InterruptedException) {
-          throw (InterruptedException) ret;
-        } else {
-          throw new RuntimeException("Unexpected return type: " + ret.getClass());
-        }
-      } else {
-        throw new RuntimeException(
-            "Exceeded expected number of calls: " + pollJobReturns.length);
-      }
-    }
-
-    private void startJob(JobReference jobRef) throws IOException, InterruptedException {
-      if (!Strings.isNullOrEmpty(executingProject)) {
-        checkArgument(
-            jobRef.getProjectId().equals(executingProject),
-            "Project id: %s is not equal to executing project: %s",
-            jobRef.getProjectId(), executingProject);
-      }
-
-      if (startJobCallsCount < startJobReturns.length) {
-        Object ret = startJobReturns[startJobCallsCount++];
-        if (ret instanceof IOException) {
-          throw (IOException) ret;
-        } else if (ret instanceof InterruptedException) {
-          throw (InterruptedException) ret;
-        } else {
-          return;
-        }
-      } else {
-        throw new RuntimeException(
-            "Exceeded expected number of calls: " + startJobReturns.length);
-      }
-    }
-
-    @Override
-    public JobStatistics dryRunQuery(String projectId, String query)
-        throws InterruptedException, IOException {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-  @Rule public transient ExpectedException thrown = ExpectedException.none();
-  @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class);
-  @Rule public transient TemporaryFolder testFolder = new TemporaryFolder();
-  @Mock(extraInterfaces = Serializable.class)
-  public transient BigQueryServices.JobService mockJobService;
-  @Mock private transient IOChannelFactory mockIOChannelFactory;
-  @Mock(extraInterfaces = Serializable.class) private transient DatasetService mockDatasetService;
-
-  private transient BigQueryOptions bqOptions;
-
-  private void checkReadTableObject(
-      BigQueryIO.Read.Bound bound, String project, String dataset, String table) {
-    checkReadTableObjectWithValidate(bound, project, dataset, table, true);
-  }
-
-  private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
-    checkReadQueryObjectWithValidate(bound, query, true);
-  }
-
-  private void checkReadTableObjectWithValidate(
-      BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) {
-    assertEquals(project, bound.getTable().getProjectId());
-    assertEquals(dataset, bound.getTable().getDatasetId());
-    assertEquals(table, bound.getTable().getTableId());
-    assertNull(bound.query);
-    assertEquals(validate, bound.getValidate());
-  }
-
-  private void checkReadQueryObjectWithValidate(
-      BigQueryIO.Read.Bound bound, String query, boolean validate) {
-    assertNull(bound.getTable());
-    assertEquals(query, bound.query);
-    assertEquals(validate, bound.getValidate());
-  }
-
-  private void checkWriteObject(
-      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
-      TableSchema schema, CreateDisposition createDisposition,
-      WriteDisposition writeDisposition) {
-    checkWriteObjectWithValidate(
-        bound, project, dataset, table, schema, createDisposition, writeDisposition, true);
-  }
-
-  private void checkWriteObjectWithValidate(
-      BigQueryIO.Write.Bound bound, String project, String dataset, String table,
-      TableSchema schema, CreateDisposition createDisposition,
-      WriteDisposition writeDisposition, boolean validate) {
-    assertEquals(project, bound.getTable().getProjectId());
-    assertEquals(dataset, bound.getTable().getDatasetId());
-    assertEquals(table, bound.getTable().getTableId());
-    assertEquals(schema, bound.getSchema());
-    assertEquals(createDisposition, bound.createDisposition);
-    assertEquals(writeDisposition, bound.writeDisposition);
-    assertEquals(validate, bound.validate);
-  }
-
-  @Before
-  public void setUp() throws IOException {
-    bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath());
-
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testBuildTableBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
-    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
-  }
-
-  @Test
-  public void testBuildQueryBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
-    checkReadQueryObject(bound, "foo_query");
-  }
-
-  @Test
-  public void testBuildTableBasedSourceWithoutValidation() {
-    // This test just checks that using withoutValidation will not trigger object
-    // construction errors.
-    BigQueryIO.Read.Bound bound =
-        BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
-    checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
-  }
-
-  @Test
-  public void testBuildQueryBasedSourceWithoutValidation() {
-    // This test just checks that using withoutValidation will not trigger object
-    // construction errors.
-    BigQueryIO.Read.Bound bound =
-        BigQueryIO.Read.fromQuery("some_query").withoutValidation();
-    checkReadQueryObjectWithValidate(bound, "some_query", false);
-  }
-
-  @Test
-  public void testBuildTableBasedSourceWithDefaultProject() {
-    BigQueryIO.Read.Bound bound =
-        BigQueryIO.Read.from("somedataset.sometable");
-    checkReadTableObject(bound, null, "somedataset", "sometable");
-  }
-
-  @Test
-  public void testBuildSourceWithTableReference() {
-    TableReference table = new TableReference()
-        .setProjectId("foo.com:project")
-        .setDatasetId("somedataset")
-        .setTableId("sometable");
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
-    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
-  }
-
-  @Test
-  public void testValidateReadSetsDefaultProject() throws Exception {
-    String projectId = "someproject";
-    String datasetId = "somedataset";
-    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    options.setProject(projectId);
-
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
-        .withDatasetService(mockDatasetService);
-    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
-        new RuntimeException("Unable to confirm BigQuery dataset presence"));
-
-    Pipeline p = TestPipeline.create(options);
-
-    TableReference tableRef = new TableReference();
-    tableRef.setDatasetId(datasetId);
-    tableRef.setTableId("sometable");
-
-    thrown.expect(RuntimeException.class);
-    // Message will be one of following depending on the execution environment.
-    thrown.expectMessage(
-        Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
-            .or(Matchers.containsString("BigQuery dataset not found for table")));
-    p.apply(BigQueryIO.Read.from(tableRef)
-        .withTestServices(fakeBqServices));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSourceWithoutTableQueryOrValidation() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQuery read operation, either table reference or query has to be set");
-    p.apply(BigQueryIO.Read.withoutValidation());
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSourceWithTableAndQuery() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQuery read operation. Specifies both a query and a table, only one of these"
-        + " should be provided");
-    p.apply("ReadMyTable",
-        BigQueryIO.Read
-            .from("foo.com:project:somedataset.sometable")
-            .fromQuery("query"));
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSourceWithTableAndFlatten() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQuery read operation. Specifies a"
-              + " table with a result flattening preference, which is not configurable");
-    p.apply("ReadMyTable",
-        BigQueryIO.Read
-            .from("foo.com:project:somedataset.sometable")
-            .withoutResultFlattening());
-    p.run();
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSourceWithTableAndFlattenWithoutValidation() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQuery read operation. Specifies a"
-              + " table with a result flattening preference, which is not configurable");
-    p.apply(
-        BigQueryIO.Read
-            .from("foo.com:project:somedataset.sometable")
-            .withoutValidation()
-            .withoutResultFlattening());
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testReadFromTable() {
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done")
-            .pollJobReturns(Status.UNKNOWN)
-            .verifyExecutingProject(bqOptions.getProject()))
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", 1)),
-            toJsonString(new TableRow().set("name", "b").set("number", 2)),
-            toJsonString(new TableRow().set("name", "c").set("number", 3)));
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    PCollection<String> output = p
-        .apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
-            .withTestServices(fakeBqServices)
-            .withoutValidation())
-        .apply(ParDo.of(new DoFn<TableRow, String>() {
-          @Override
-          public void processElement(ProcessContext c) throws Exception {
-            c.output((String) c.element().get("name"));
-          }
-        }));
-
-    PAssert.that(output)
-        .containsInAnyOrder(ImmutableList.of("a", "b", "c"));
-
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCustomSink() throws Exception {
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done", "done")
-            .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED));
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    p.apply(Create.of(
-        new TableRow().set("name", "a").set("number", 1),
-        new TableRow().set("name", "b").set("number", 2),
-        new TableRow().set("name", "c").set("number", 3))
-        .withCoder(TableRowJsonCoder.of()))
-    .apply(BigQueryIO.Write.to("dataset-id.table-id")
-        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-        .withSchema(new TableSchema().setFields(
-            ImmutableList.of(
-                new TableFieldSchema().setName("name").setType("STRING"),
-                new TableFieldSchema().setName("number").setType("INTEGER"))))
-        .withTestServices(fakeBqServices)
-        .withoutValidation());
-    p.run();
-
-    logged.verifyInfo("Starting BigQuery load job");
-    logged.verifyInfo("Previous load jobs failed, retrying.");
-    File tempDir = new File(bqOptions.getTempLocation());
-    assertEquals(0, tempDir.listFiles(new FileFilter() {
-      @Override
-      public boolean accept(File pathname) {
-        return pathname.isFile();
-      }}).length);
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testCustomSinkUnknown() throws Exception {
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(new FakeJobService()
-            .startJobReturns("done", "done")
-            .pollJobReturns(Status.FAILED, Status.UNKNOWN));
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    p.apply(Create.of(
-        new TableRow().set("name", "a").set("number", 1),
-        new TableRow().set("name", "b").set("number", 2),
-        new TableRow().set("name", "c").set("number", 3))
-        .withCoder(TableRowJsonCoder.of()))
-    .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id")
-        .withCreateDisposition(CreateDisposition.CREATE_NEVER)
-        .withTestServices(fakeBqServices)
-        .withoutValidation());
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("Failed to poll the load job status.");
-    p.run();
-
-    File tempDir = new File(bqOptions.getTempLocation());
-    assertEquals(0, tempDir.listFiles(new FileFilter() {
-      @Override
-      public boolean accept(File pathname) {
-        return pathname.isFile();
-      }}).length);
-  }
-
-  @Test
-  public void testBuildSourceDisplayData() {
-    String tableSpec = "project:dataset.tableid";
-
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
-        .from(tableSpec)
-        .fromQuery("myQuery")
-        .withoutResultFlattening()
-        .withoutValidation();
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("table", tableSpec));
-    assertThat(displayData, hasDisplayItem("query", "myQuery"));
-    assertThat(displayData, hasDisplayItem("flattenResults", false));
-    assertThat(displayData, hasDisplayItem("validation", false));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
-  public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
-        .from("project:dataset.tableId")
-        .withTestServices(new FakeBigQueryServices()
-            .withDatasetService(mockDatasetService)
-            .withJobService(mockJobService))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("BigQueryIO.Read should include the table spec in its primitive display data",
-        displayData, hasItem(hasDisplayItem("table")));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
-  public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
-        .fromQuery("foobar")
-        .withTestServices(new FakeBigQueryServices()
-            .withDatasetService(mockDatasetService)
-            .withJobService(mockJobService))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
-    assertThat("BigQueryIO.Read should include the query in its primitive display data",
-        displayData, hasItem(hasDisplayItem("query")));
-  }
-
-
-  @Test
-  public void testBuildSink() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
-  public void testBatchSinkPrimitiveDisplayData() throws IOException, InterruptedException {
-    testSinkPrimitiveDisplayData(/* streaming: */ false);
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation configuration insufficient")
-  public void testStreamingSinkPrimitiveDisplayData() throws IOException, InterruptedException {
-    testSinkPrimitiveDisplayData(/* streaming: */ true);
-  }
-
-  private void testSinkPrimitiveDisplayData(boolean streaming) throws IOException,
-      InterruptedException {
-    PipelineOptions options = TestPipeline.testingPipelineOptions();
-    options.as(StreamingOptions.class).setStreaming(streaming);
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(options);
-
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
-        .to("project:dataset.table")
-        .withSchema(new TableSchema().set("col1", "type1").set("col2", "type2"))
-        .withTestServices(new FakeBigQueryServices()
-          .withDatasetService(mockDatasetService)
-          .withJobService(mockJobService))
-        .withoutValidation();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("BigQueryIO.Write should include the table spec in its primitive display data",
-        displayData, hasItem(hasDisplayItem("tableSpec")));
-
-    assertThat("BigQueryIO.Write should include the table schema in its primitive display data",
-        displayData, hasItem(hasDisplayItem("schema")));
-  }
-
-  @Test
-  public void testBuildSinkwithoutValidation() {
-    // This test just checks that using withoutValidation will not trigger object
-    // construction errors.
-    BigQueryIO.Write.Bound bound =
-        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
-    checkWriteObjectWithValidate(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false);
-  }
-
-  @Test
-  public void testBuildSinkDefaultProject() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
-    checkWriteObject(
-        bound, null, "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  public void testBuildSinkWithTableReference() {
-    TableReference table = new TableReference()
-        .setProjectId("foo.com:project")
-        .setDatasetId("somedataset")
-        .setTableId("sometable");
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSinkWithoutTable() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("must set the table reference");
-    p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write.withoutValidation());
-  }
-
-  @Test
-  public void testBuildSinkWithSchema() {
-    TableSchema schema = new TableSchema();
-    BigQueryIO.Write.Bound bound =
-        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  public void testBuildSinkWithCreateDispositionNever() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
-        .to("foo.com:project:somedataset.sometable")
-        .withCreateDisposition(CreateDisposition.CREATE_NEVER);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_NEVER, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  public void testBuildSinkWithCreateDispositionIfNeeded() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
-        .to("foo.com:project:somedataset.sometable")
-        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  public void testBuildSinkWithWriteDispositionTruncate() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
-        .to("foo.com:project:somedataset.sometable")
-        .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_TRUNCATE);
-  }
-
-  @Test
-  public void testBuildSinkWithWriteDispositionAppend() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
-        .to("foo.com:project:somedataset.sometable")
-        .withWriteDisposition(WriteDisposition.WRITE_APPEND);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_APPEND);
-  }
-
-  @Test
-  public void testBuildSinkWithWriteDispositionEmpty() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write
-        .to("foo.com:project:somedataset.sometable")
-        .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
-    checkWriteObject(
-        bound, "foo.com:project", "somedataset", "sometable",
-        null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
-  }
-
-  @Test
-  public void testBuildSinkDisplayData() {
-    String tableSpec = "project:dataset.table";
-    TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
-
-    BigQueryIO.Write.Bound write = BigQueryIO.Write
-        .to(tableSpec)
-        .withSchema(schema)
-        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
-        .withoutValidation();
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("table"));
-    assertThat(displayData, hasDisplayItem("schema"));
-    assertThat(displayData,
-        hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString()));
-    assertThat(displayData,
-        hasDisplayItem("writeDisposition", WriteDisposition.WRITE_APPEND.toString()));
-    assertThat(displayData, hasDisplayItem("validation", false));
-  }
-
-  private void testWriteValidatesDataset(boolean streaming) throws Exception {
-    String projectId = "someproject";
-    String datasetId = "somedataset";
-
-    BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    options.setProject(projectId);
-    options.setStreaming(streaming);
-
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
-        .withDatasetService(mockDatasetService);
-    when(mockDatasetService.getDataset(projectId, datasetId)).thenThrow(
-        new RuntimeException("Unable to confirm BigQuery dataset presence"));
-
-    Pipeline p = TestPipeline.create(options);
-
-    TableReference tableRef = new TableReference();
-    tableRef.setDatasetId(datasetId);
-    tableRef.setTableId("sometable");
-
-    thrown.expect(RuntimeException.class);
-    // Message will be one of following depending on the execution environment.
-    thrown.expectMessage(
-        Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
-            .or(Matchers.containsString("BigQuery dataset not found for table")));
-    p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-     .apply(BigQueryIO.Write
-         .to(tableRef)
-         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
-         .withSchema(new TableSchema())
-         .withTestServices(fakeBqServices));
-  }
-
-  @Test
-  public void testWriteValidatesDatasetBatch() throws Exception {
-    testWriteValidatesDataset(false);
-  }
-
-  @Test
-  public void testWriteValidatesDatasetStreaming() throws Exception {
-    testWriteValidatesDataset(true);
-  }
-
-  @Test
-  public void testTableParsing() {
-    TableReference ref = BigQueryIO
-        .parseTableSpec("my-project:data_set.table_name");
-    Assert.assertEquals("my-project", ref.getProjectId());
-    Assert.assertEquals("data_set", ref.getDatasetId());
-    Assert.assertEquals("table_name", ref.getTableId());
-  }
-
-  @Test
-  public void testTableParsing_validPatterns() {
-    BigQueryIO.parseTableSpec("a123-456:foo_bar.d");
-    BigQueryIO.parseTableSpec("a12345:b.c");
-    BigQueryIO.parseTableSpec("b12345.c");
-  }
-
-  @Test
-  public void testTableParsing_noProjectId() {
-    TableReference ref = BigQueryIO
-        .parseTableSpec("data_set.table_name");
-    Assert.assertEquals(null, ref.getProjectId());
-    Assert.assertEquals("data_set", ref.getDatasetId());
-    Assert.assertEquals("table_name", ref.getTableId());
-  }
-
-  @Test
-  public void testTableParsingError() {
-    thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec("0123456:foo.bar");
-  }
-
-  @Test
-  public void testTableParsingError_2() {
-    thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec("myproject:.bar");
-  }
-
-  @Test
-  public void testTableParsingError_3() {
-    thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec(":a.b");
-  }
-
-  @Test
-  public void testTableParsingError_slash() {
-    thrown.expect(IllegalArgumentException.class);
-    BigQueryIO.parseTableSpec("a\\b12345:c.d");
-  }
-
-  // Test that BigQuery's special null placeholder objects can be encoded.
-  @Test
-  public void testCoder_nullCell() throws CoderException {
-    TableRow row = new TableRow();
-    row.set("temperature", Data.nullOf(Object.class));
-    row.set("max_temperature", Data.nullOf(Object.class));
-
-    byte[] bytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), row);
-
-    TableRow newRow = CoderUtils.decodeFromByteArray(TableRowJsonCoder.of(), bytes);
-    byte[] newBytes = CoderUtils.encodeToByteArray(TableRowJsonCoder.of(), newRow);
-
-    Assert.assertArrayEquals(bytes, newBytes);
-  }
-
-  @Test
-  public void testBigQueryIOGetName() {
-    assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
-    assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
-  }
-
-  @Test
-  public void testWriteValidateFailsCreateNoSchema() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("no schema was provided");
-    TestPipeline.create()
-        .apply(Create.<TableRow>of())
-        .apply(BigQueryIO.Write
-            .to("dataset.table")
-            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
-  }
-
-  @Test
-  public void testWriteValidateFailsTableAndTableSpec() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("Cannot set both a table reference and a table function");
-    TestPipeline.create()
-        .apply(Create.<TableRow>of())
-        .apply(BigQueryIO.Write
-            .to("dataset.table")
-            .to(new SerializableFunction<BoundedWindow, String>() {
-              @Override
-              public String apply(BoundedWindow input) {
-                return null;
-              }
-            }));
-  }
-
-  @Test
-  public void testWriteValidateFailsNoTableAndNoTableSpec() {
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
-    TestPipeline.create()
-        .apply(Create.<TableRow>of())
-        .apply("name", BigQueryIO.Write.withoutValidation());
-  }
-
-  @Test
-  public void testBigQueryTableSourceThroughJsonAPI() throws Exception {
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
-
-    String jobIdToken = "testJobIdToken";
-    TableReference table = BigQueryIO.parseTableSpec("project.data_set.table_name");
-    String extractDestinationDir = "mock://tempLocation";
-    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
-
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(bqSource, options),
-        CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-  }
-
-  @Test
-  public void testBigQueryTableSourceInitSplit() throws Exception {
-    Job extractJob = new Job();
-    JobStatistics jobStats = new JobStatistics();
-    JobStatistics4 extractStats = new JobStatistics4();
-    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
-    jobStats.setExtract(extractStats);
-    extractJob.setStatus(new JobStatus())
-        .setStatistics(jobStats);
-
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
-        .withDatasetService(mockDatasetService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
-
-    String jobIdToken = "testJobIdToken";
-    TableReference table = BigQueryIO.parseTableSpec("project:data_set.table_name");
-    String extractDestinationDir = "mock://tempLocation";
-    BoundedSource<TableRow> bqSource = BigQueryTableSource.create(
-        jobIdToken, table, extractDestinationDir, fakeBqServices, "project");
-
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
-
-    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-        .thenReturn(extractJob);
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setTempLocation("mock://tempLocation");
-
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
-    when(mockIOChannelFactory.resolve(anyString(), anyString()))
-        .thenReturn("mock://tempLocation/output");
-    when(mockDatasetService.getTable(anyString(), anyString(), anyString()))
-        .thenReturn(new Table().setSchema(new TableSchema()));
-
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(bqSource, options),
-        CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-
-    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
-    assertEquals(1, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
-    Mockito.verify(mockJobService)
-        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
-  }
-
-  @Test
-  public void testBigQueryQuerySourceInitSplit() throws Exception {
-    TableReference dryRunTable = new TableReference();
-
-    Job queryJob = new Job();
-    JobStatistics queryJobStats = new JobStatistics();
-    JobStatistics2 queryStats = new JobStatistics2();
-    queryStats.setReferencedTables(ImmutableList.of(dryRunTable));
-    queryJobStats.setQuery(queryStats);
-    queryJob.setStatus(new JobStatus())
-        .setStatistics(queryJobStats);
-
-    Job extractJob = new Job();
-    JobStatistics extractJobStats = new JobStatistics();
-    JobStatistics4 extractStats = new JobStatistics4();
-    extractStats.setDestinationUriFileCounts(ImmutableList.of(1L));
-    extractJobStats.setExtract(extractStats);
-    extractJob.setStatus(new JobStatus())
-        .setStatistics(extractJobStats);
-
-    FakeBigQueryServices fakeBqServices = new FakeBigQueryServices()
-        .withJobService(mockJobService)
-        .withDatasetService(mockDatasetService)
-        .readerReturns(
-            toJsonString(new TableRow().set("name", "a").set("number", "1")),
-            toJsonString(new TableRow().set("name", "b").set("number", "2")),
-            toJsonString(new TableRow().set("name", "c").set("number", "3")));
-
-    String jobIdToken = "testJobIdToken";
-    String extractDestinationDir = "mock://tempLocation";
-    TableReference destinationTable = BigQueryIO.parseTableSpec("project:data_set.table_name");
-    BoundedSource<TableRow> bqSource = BigQueryQuerySource.create(
-        jobIdToken, "query", destinationTable, true /* flattenResults */,
-        extractDestinationDir, fakeBqServices);
-
-    List<TableRow> expected = ImmutableList.of(
-        new TableRow().set("name", "a").set("number", "1"),
-        new TableRow().set("name", "b").set("number", "2"),
-        new TableRow().set("name", "c").set("number", "3"));
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    options.setTempLocation(extractDestinationDir);
-
-    TableReference queryTable = new TableReference()
-        .setProjectId("testProejct")
-        .setDatasetId("testDataset")
-        .setTableId("testTable");
-    when(mockJobService.dryRunQuery(anyString(), anyString()))
-        .thenReturn(new JobStatistics().setQuery(
-            new JobStatistics2()
-                .setTotalBytesProcessed(100L)
-                .setReferencedTables(ImmutableList.of(queryTable))));
-    when(mockDatasetService.getTable(
-        eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId())))
-        .thenReturn(new Table().setSchema(new TableSchema()));
-    when(mockDatasetService.getTable(
-        eq(destinationTable.getProjectId()),
-        eq(destinationTable.getDatasetId()),
-        eq(destinationTable.getTableId())))
-        .thenReturn(new Table().setSchema(new TableSchema()));
-    IOChannelUtils.setIOFactory("mock", mockIOChannelFactory);
-    when(mockIOChannelFactory.resolve(anyString(), anyString()))
-        .thenReturn("mock://tempLocation/output");
-    when(mockJobService.pollJob(Mockito.<JobReference>any(), Mockito.anyInt()))
-        .thenReturn(extractJob);
-
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(bqSource, options),
-        CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options);
-
-    List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
-    assertEquals(1, sources.size());
-    BoundedSource<TableRow> actual = sources.get(0);
-    assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));
-
-    Mockito.verify(mockJobService)
-        .startQueryJob(
-            Mockito.<JobReference>any(), Mockito.<JobConfigurationQuery>any());
-    Mockito.verify(mockJobService)
-        .startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
-    Mockito.verify(mockDatasetService)
-        .createDataset(anyString(), anyString(), anyString(), anyString());
-  }
-
-  @Test
-  public void testTransformingSource() throws Exception {
-    int numElements = 10000;
-    @SuppressWarnings("deprecation")
-    BoundedSource<Long> longSource = CountingSource.upTo(numElements);
-    SerializableFunction<Long, String> toStringFn =
-        new SerializableFunction<Long, String>() {
-          @Override
-          public String apply(Long input) {
-            return input.toString();
-         }};
-    BoundedSource<String> stringSource = new TransformingSource<>(
-        longSource, toStringFn, StringUtf8Coder.of());
-
-    List<String> expected = Lists.newArrayList();
-    for (int i = 0; i < numElements; i++) {
-      expected.add(String.valueOf(i));
-    }
-
-    PipelineOptions options = PipelineOptionsFactory.create();
-    Assert.assertThat(
-        SourceTestUtils.readFromSource(stringSource, options),
-        CoreMatchers.is(expected));
-    SourceTestUtils.assertSplitAtFractionBehavior(
-        stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options);
-
-    SourceTestUtils.assertSourcesEqualReferenceSource(
-        stringSource, stringSource.splitIntoBundles(100, options), options);
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testPassThroughThenCleanup() throws Exception {
-    Pipeline p = TestPipeline.create();
-
-    PCollection<Integer> output = p
-        .apply(Create.of(1, 2, 3))
-        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
-          @Override
-          void cleanup(PipelineOptions options) throws Exception {
-            // no-op
-          }}));
-
-    PAssert.that(output).containsInAnyOrder(1, 2, 3);
-
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testPassThroughThenCleanupExecuted() throws Exception {
-    Pipeline p = TestPipeline.create();
-
-    p.apply(Create.<Integer>of())
-        .apply(new PassThroughThenCleanup<Integer>(new CleanupOperation() {
-          @Override
-          void cleanup(PipelineOptions options) throws Exception {
-            throw new RuntimeException("cleanup executed");
-          }}));
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("cleanup executed");
-
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
index 38e921a..b72ab9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/AvroUtilsTest.java
@@ -23,17 +23,10 @@ import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.util.AvroUtils.AvroMetadata;
 
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.Lists;
-
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
 import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.reflect.Nullable;
 import org.junit.Rule;
@@ -109,77 +102,6 @@ public class AvroUtilsTest {
     assertEquals(8, schema.getFields().size());
   }
 
-  @Test
-  public void testConvertGenericRecordToTableRow() throws Exception {
-    TableSchema tableSchema = new TableSchema();
-    List<TableFieldSchema> subFields = Lists.<TableFieldSchema>newArrayList(
-        new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
-    /*
-     * Note that the quality and quantity fields do not have their mode set, so they should default
-     * to NULLABLE. This is an important test of BigQuery semantics.
-     *
-     * All the other fields we set in this function are required on the Schema response.
-     *
-     * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
-     */
-    List<TableFieldSchema> fields =
-        Lists.<TableFieldSchema>newArrayList(
-            new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
-            new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
-            new TableFieldSchema().setName("quality").setType("FLOAT") /* default to NULLABLE */,
-            new TableFieldSchema().setName("quantity").setType("INTEGER") /* default to NULLABLE */,
-            new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
-            new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
-            new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
-                .setFields(subFields),
-            new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
-                .setFields(subFields));
-    tableSchema.setFields(fields);
-    Schema avroSchema = AvroCoder.of(Bird.class).getSchema();
-
-    {
-      // Test nullable fields.
-      GenericRecord record = new GenericData.Record(avroSchema);
-      record.put("number", 5L);
-      TableRow convertedRow = AvroUtils.convertGenericRecordToTableRow(record, tableSchema);
-      TableRow row = new TableRow()
-          .set("number", "5")
-          .set("associates", new ArrayList<TableRow>());
-      assertEquals(row, convertedRow);
-    }
-    {
-      // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, and FLOAT.
-      GenericRecord record = new GenericData.Record(avroSchema);
-      record.put("number", 5L);
-      record.put("quality", 5.0);
-      record.put("birthday", 5L);
-      record.put("flighted", Boolean.TRUE);
-      TableRow convertedRow = AvroUtils.convertGenericRecordToTableRow(record, tableSchema);
-      TableRow row = new TableRow()
-          .set("number", "5")
-          .set("birthday", "1970-01-01 00:00:00.000005 UTC")
-          .set("quality", 5.0)
-          .set("associates", new ArrayList<TableRow>())
-          .set("flighted", Boolean.TRUE);
-      assertEquals(row, convertedRow);
-    }
-    {
-      // Test repeated fields.
-      Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema();
-      GenericRecord nestedRecord = new GenericData.Record(subBirdSchema);
-      nestedRecord.put("species", "other");
-      GenericRecord record = new GenericData.Record(avroSchema);
-      record.put("number", 5L);
-      record.put("associates", Lists.<GenericRecord>newArrayList(nestedRecord));
-      TableRow convertedRow = AvroUtils.convertGenericRecordToTableRow(record, tableSchema);
-      TableRow row = new TableRow()
-          .set("associates", Lists.<TableRow>newArrayList(
-              new TableRow().set("species", "other")))
-          .set("number", "5");
-      assertEquals(row, convertedRow);
-    }
-  }
-
   /**
    * Pojo class used as the record type in tests.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
deleted file mode 100644
index 3ec2b37..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.testing.ExpectedLogs;
-import org.apache.beam.sdk.testing.FastNanoClockAndSleeper;
-import org.apache.beam.sdk.util.BigQueryServicesImpl.JobServiceImpl;
-
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationLoad;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.cloud.hadoop.util.ApiErrorExtractor;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Tests for {@link BigQueryServicesImpl}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryServicesImplTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class);
-  @Mock private LowLevelHttpResponse response;
-  private Bigquery bigquery;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    // A mock transport that lets us mock the API responses.
-    MockHttpTransport transport =
-        new MockHttpTransport.Builder()
-            .setLowLevelHttpRequest(
-                new MockLowLevelHttpRequest() {
-                  @Override
-                  public LowLevelHttpResponse execute() throws IOException {
-                    return response;
-                  }
-                })
-            .build();
-
-    // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
-    bigquery =
-        new Bigquery.Builder(
-                transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
-            .build();
-  }
-
-  /**
-   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds.
-   */
-  @Test
-  public void testStartLoadJobSucceeds() throws IOException, InterruptedException {
-    Job testJob = new Job();
-    JobReference jobRef = new JobReference();
-    jobRef.setJobId("jobId");
-    jobRef.setProjectId("projectId");
-    testJob.setJobReference(jobRef);
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testJob));
-
-    TableReference ref = new TableReference();
-    ref.setProjectId("projectId");
-    JobConfigurationLoad loadConfig = new JobConfigurationLoad();
-    loadConfig.setDestinationTable(ref);
-
-    Sleeper sleeper = new FastNanoClockAndSleeper();
-    BackOff backoff = new AttemptBoundedExponentialBackOff(
-        5 /* attempts */, 1000 /* initialIntervalMillis */);
-    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
-
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds
-   * with an already exist job.
-   */
-  @Test
-  public void testStartLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException {
-    Job testJob = new Job();
-    JobReference jobRef = new JobReference();
-    jobRef.setJobId("jobId");
-    jobRef.setProjectId("projectId");
-    testJob.setJobReference(jobRef);
-
-    when(response.getStatusCode()).thenReturn(409); // 409 means already exists
-
-    TableReference ref = new TableReference();
-    ref.setProjectId("projectId");
-    JobConfigurationLoad loadConfig = new JobConfigurationLoad();
-    loadConfig.setDestinationTable(ref);
-
-    Sleeper sleeper = new FastNanoClockAndSleeper();
-    BackOff backoff = new AttemptBoundedExponentialBackOff(
-        5 /* attempts */, 1000 /* initialIntervalMillis */);
-    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
-
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds with a retry.
-   */
-  @Test
-  public void testStartLoadJobRetry() throws IOException, InterruptedException {
-    Job testJob = new Job();
-    JobReference jobRef = new JobReference();
-    jobRef.setJobId("jobId");
-    jobRef.setProjectId("projectId");
-    testJob.setJobReference(jobRef);
-
-    // First response is 403 rate limited, second response has valid payload.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
-        .thenReturn(toStream(testJob));
-
-    TableReference ref = new TableReference();
-    ref.setProjectId("projectId");
-    JobConfigurationLoad loadConfig = new JobConfigurationLoad();
-    loadConfig.setDestinationTable(ref);
-
-    Sleeper sleeper = new FastNanoClockAndSleeper();
-    BackOff backoff = new AttemptBoundedExponentialBackOff(
-        5 /* attempts */, 1000 /* initialIntervalMillis */);
-    JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff);
-
-    verify(response, times(2)).getStatusCode();
-    verify(response, times(2)).getContent();
-    verify(response, times(2)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} succeeds.
-   */
-  @Test
-  public void testPollJobSucceeds() throws IOException, InterruptedException {
-    Job testJob = new Job();
-    testJob.setStatus(new JobStatus().setState("DONE"));
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testJob));
-
-    BigQueryServicesImpl.JobServiceImpl jobService =
-        new BigQueryServicesImpl.JobServiceImpl(bigquery);
-    JobReference jobRef = new JobReference()
-        .setProjectId("projectId")
-        .setJobId("jobId");
-    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
-
-    assertEquals(testJob, job);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} fails.
-   */
-  @Test
-  public void testPollJobFailed() throws IOException, InterruptedException {
-    Job testJob = new Job();
-    testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto()));
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testJob));
-
-    BigQueryServicesImpl.JobServiceImpl jobService =
-        new BigQueryServicesImpl.JobServiceImpl(bigquery);
-    JobReference jobRef = new JobReference()
-        .setProjectId("projectId")
-        .setJobId("jobId");
-    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);
-
-    assertEquals(testJob, job);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryServicesImpl.JobServiceImpl#pollJob} returns UNKNOWN.
-   */
-  @Test
-  public void testPollJobUnknown() throws IOException, InterruptedException {
-    Job testJob = new Job();
-    testJob.setStatus(new JobStatus());
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testJob));
-
-    BigQueryServicesImpl.JobServiceImpl jobService =
-        new BigQueryServicesImpl.JobServiceImpl(bigquery);
-    JobReference jobRef = new JobReference()
-        .setProjectId("projectId")
-        .setJobId("jobId");
-    Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF);
-
-    assertEquals(null, job);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  @Test
-  public void testExecuteWithRetries() throws IOException, InterruptedException {
-    Table testTable = new Table();
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testTable));
-
-    Table table = BigQueryServicesImpl.executeWithRetries(
-        bigquery.tables().get("projectId", "datasetId", "tableId"),
-        "Failed to get table.",
-        Sleeper.DEFAULT,
-        BackOff.STOP_BACKOFF);
-
-    assertEquals(testTable, table);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /** A helper to wrap a {@link GenericJson} object in a content stream. */
-  private static InputStream toStream(GenericJson content) throws IOException {
-    return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
-  }
-
-  /** A helper that generates the error JSON payload that Google APIs produce. */
-  private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
-    ErrorInfo info = new ErrorInfo();
-    info.setReason(reason);
-    info.setDomain("global");
-    // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
-    GoogleJsonError error = new GoogleJsonError();
-    error.setErrors(ImmutableList.of(info));
-    error.setCode(status);
-    // The actual JSON response is an error container.
-    GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
-    container.setError(error);
-    return container;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
deleted file mode 100644
index 344e916..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableInserterTest.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Verify.verifyNotNull;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.ExpectedLogs;
-
-import com.google.api.client.googleapis.json.GoogleJsonError;
-import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
-import com.google.api.client.googleapis.json.GoogleJsonErrorContainer;
-import com.google.api.client.googleapis.json.GoogleJsonResponseException;
-import com.google.api.client.http.LowLevelHttpResponse;
-import com.google.api.client.json.GenericJson;
-import com.google.api.client.json.Json;
-import com.google.api.client.json.jackson2.JacksonFactory;
-import com.google.api.client.testing.http.MockHttpTransport;
-import com.google.api.client.testing.http.MockLowLevelHttpRequest;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.cloud.hadoop.util.RetryBoundedBackOff;
-import com.google.common.collect.ImmutableList;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Tests of {@link BigQueryTableInserter}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryTableInserterTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryTableInserter.class);
-  @Mock private LowLevelHttpResponse response;
-  private Bigquery bigquery;
-  private PipelineOptions options;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    // A mock transport that lets us mock the API responses.
-    MockHttpTransport transport =
-        new MockHttpTransport.Builder()
-            .setLowLevelHttpRequest(
-                new MockLowLevelHttpRequest() {
-                  @Override
-                  public LowLevelHttpResponse execute() throws IOException {
-                    return response;
-                  }
-                })
-            .build();
-
-    // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer.
-    bigquery =
-        new Bigquery.Builder(
-                transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())
-            .build();
-
-    options = PipelineOptionsFactory.create();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    // These three interactions happen for every request in the normal response parsing.
-    verify(response, atLeastOnce()).getContentEncoding();
-    verify(response, atLeastOnce()).getHeaderCount();
-    verify(response, atLeastOnce()).getReasonPhrase();
-    verifyNoMoreInteractions(response);
-  }
-
-  /** A helper to wrap a {@link GenericJson} object in a content stream. */
-  private static InputStream toStream(GenericJson content) throws IOException {
-    return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));
-  }
-
-  /** A helper that generates the error JSON payload that Google APIs produce. */
-  private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) {
-    ErrorInfo info = new ErrorInfo();
-    info.setReason(reason);
-    info.setDomain("global");
-    // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one.
-    GoogleJsonError error = new GoogleJsonError();
-    error.setErrors(ImmutableList.of(info));
-    error.setCode(status);
-    // The actual JSON response is an error container.
-    GoogleJsonErrorContainer container = new GoogleJsonErrorContainer();
-    container.setError(error);
-    return container;
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} succeeds on the first try.
-   */
-  @Test
-  public void testCreateTableSucceeds() throws IOException {
-    Table testTable = new Table().setDescription("a table");
-
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(200);
-    when(response.getContent()).thenReturn(toStream(testTable));
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    Table ret =
-        inserter.tryCreateTable(
-            new Table(),
-            "project",
-            "dataset",
-            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
-            Sleeper.DEFAULT);
-    assertEquals(testTable, ret);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} succeeds when the table already exists.
-   */
-  @Test
-  public void testCreateTableSucceedsAlreadyExists() throws IOException {
-    when(response.getStatusCode()).thenReturn(409); // 409 means already exists
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    Table ret =
-        inserter.tryCreateTable(
-            new Table(),
-            "project",
-            "dataset",
-            new RetryBoundedBackOff(0, BackOff.ZERO_BACKOFF),
-            Sleeper.DEFAULT);
-
-    assertNull(ret);
-    verify(response, times(1)).getStatusCode();
-    verify(response, times(1)).getContent();
-    verify(response, times(1)).getContentType();
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} retries quota rate limited attempts.
-   */
-  @Test
-  public void testCreateTableRetry() throws IOException {
-    TableReference ref =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    Table testTable = new Table().setTableReference(ref);
-
-    // First response is 403 rate limited, second response has valid payload.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
-        .thenReturn(toStream(testTable));
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    Table ret =
-        inserter.tryCreateTable(
-            testTable,
-            "project",
-            "dataset",
-            new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
-            Sleeper.DEFAULT);
-    assertEquals(testTable, ret);
-    verify(response, times(2)).getStatusCode();
-    verify(response, times(2)).getContent();
-    verify(response, times(2)).getContentType();
-    verifyNotNull(ret.getTableReference());
-    expectedLogs.verifyInfo(
-        "Quota limit reached when creating table project:dataset.table, "
-            + "retrying up to 5.0 minutes");
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter} does not retry non-rate-limited attempts.
-   */
-  @Test
-  public void testCreateTableDoesNotRetry() throws IOException {
-    Table testTable = new Table().setDescription("a table");
-
-    // First response is 403 not-rate-limited, second response has valid payload but should not
-    // be invoked.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
-        .thenReturn(toStream(testTable));
-
-    thrown.expect(GoogleJsonResponseException.class);
-    thrown.expectMessage("actually forbidden");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-    try {
-      inserter.tryCreateTable(
-          new Table(),
-          "project",
-          "dataset",
-          new RetryBoundedBackOff(3, BackOff.ZERO_BACKOFF),
-          Sleeper.DEFAULT);
-      fail();
-    } catch (IOException e) {
-      verify(response, times(1)).getStatusCode();
-      verify(response, times(1)).getContent();
-      verify(response, times(1)).getContentType();
-      throw e;
-    }
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts.
-   */
-  @Test
-  public void testInsertRetry() throws IOException {
-    TableReference ref =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = new ArrayList<>();
-    rows.add(new TableRow());
-
-    // First response is 403 rate limited, second response has valid payload.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
-        .thenReturn(toStream(new TableDataInsertAllResponse()));
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-
-    inserter.insertAll(ref, rows);
-    verify(response, times(2)).getStatusCode();
-    verify(response, times(2)).getContent();
-    verify(response, times(2)).getContentType();
-    expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
-  }
-
-  /**
-   * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
-   */
-  @Test
-  public void testInsertDoesNotRetry() throws Throwable {
-    TableReference ref =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    List<TableRow> rows = new ArrayList<>();
-    rows.add(new TableRow());
-
-    // First response is 403 not-rate-limited, second response has valid payload but should not
-    // be invoked.
-    when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
-    when(response.getStatusCode()).thenReturn(403).thenReturn(200);
-    when(response.getContent())
-        .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
-        .thenReturn(toStream(new TableDataInsertAllResponse()));
-
-    thrown.expect(GoogleJsonResponseException.class);
-    thrown.expectMessage("actually forbidden");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery, options);
-
-    try {
-      inserter.insertAll(ref, rows);
-      fail();
-    } catch (RuntimeException e) {
-      verify(response, times(1)).getStatusCode();
-      verify(response, times(1)).getContent();
-      verify(response, times(1)).getContentType();
-      throw e.getCause();
-    }
-  }
-}