You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:03:00 UTC

[05/10] incubator-beam git commit: BigQueryIO: move to google-cloud-platform module

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java
deleted file mode 100644
index e92f2c6..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Tests for {@link BigQueryTableRowIterator}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryTableRowIteratorTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Mock private Bigquery mockClient;
-  @Mock private Bigquery.Datasets mockDatasets;
-  @Mock private Bigquery.Datasets.Delete mockDatasetsDelete;
-  @Mock private Bigquery.Datasets.Insert mockDatasetsInsert;
-  @Mock private Bigquery.Jobs mockJobs;
-  @Mock private Bigquery.Jobs.Get mockJobsGet;
-  @Mock private Bigquery.Jobs.Insert mockJobsInsert;
-  @Mock private Bigquery.Tables mockTables;
-  @Mock private Bigquery.Tables.Get mockTablesGet;
-  @Mock private Bigquery.Tables.Delete mockTablesDelete;
-  @Mock private Bigquery.Tabledata mockTabledata;
-  @Mock private Bigquery.Tabledata.List mockTabledataList;
-
-  @Before
-  public void setUp() throws IOException {
-    MockitoAnnotations.initMocks(this);
-    when(mockClient.tabledata()).thenReturn(mockTabledata);
-    when(mockTabledata.list(anyString(), anyString(), anyString())).thenReturn(mockTabledataList);
-
-    when(mockClient.tables()).thenReturn(mockTables);
-    when(mockTables.delete(anyString(), anyString(), anyString())).thenReturn(mockTablesDelete);
-    when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
-
-    when(mockClient.datasets()).thenReturn(mockDatasets);
-    when(mockDatasets.delete(anyString(), anyString())).thenReturn(mockDatasetsDelete);
-    when(mockDatasets.insert(anyString(), any(Dataset.class))).thenReturn(mockDatasetsInsert);
-
-    when(mockClient.jobs()).thenReturn(mockJobs);
-    when(mockJobs.insert(anyString(), any(Job.class))).thenReturn(mockJobsInsert);
-    when(mockJobs.get(anyString(), anyString())).thenReturn(mockJobsGet);
-  }
-
-  @After
-  public void tearDown() {
-    verifyNoMoreInteractions(mockClient);
-    verifyNoMoreInteractions(mockDatasets);
-    verifyNoMoreInteractions(mockDatasetsDelete);
-    verifyNoMoreInteractions(mockDatasetsInsert);
-    verifyNoMoreInteractions(mockJobs);
-    verifyNoMoreInteractions(mockJobsGet);
-    verifyNoMoreInteractions(mockJobsInsert);
-    verifyNoMoreInteractions(mockTables);
-    verifyNoMoreInteractions(mockTablesDelete);
-    verifyNoMoreInteractions(mockTablesGet);
-    verifyNoMoreInteractions(mockTabledata);
-    verifyNoMoreInteractions(mockTabledataList);
-  }
-
-  private static Table tableWithBasicSchema() {
-    return new Table()
-        .setSchema(
-            new TableSchema()
-                .setFields(
-                    Arrays.asList(
-                        new TableFieldSchema().setName("name").setType("STRING"),
-                        new TableFieldSchema().setName("answer").setType("INTEGER"))));
-  }
-
-  private TableRow rawRow(Object... args) {
-    List<TableCell> cells = new LinkedList<>();
-    for (Object a : args) {
-      cells.add(new TableCell().setV(a));
-    }
-    return new TableRow().setF(cells);
-  }
-
-  private TableDataList rawDataList(TableRow... rows) {
-    return new TableDataList().setRows(Arrays.asList(rows));
-  }
-
-  /**
-   * Verifies that when the query runs, the correct data is returned and the temporary dataset and
-   * table are both cleaned up.
-   */
-  @Test
-  public void testReadFromQuery() throws IOException, InterruptedException {
-    // Mock job inserting.
-    Job insertedJob = new Job().setJobReference(new JobReference());
-    when(mockJobsInsert.execute()).thenReturn(insertedJob);
-
-    // Mock job polling.
-    JobStatus status = new JobStatus().setState("DONE");
-    TableReference tableRef =
-        new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
-    JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef);
-    Job getJob =
-        new Job()
-            .setJobReference(new JobReference())
-            .setStatus(status)
-            .setConfiguration(new JobConfiguration().setQuery(queryConfig));
-    when(mockJobsGet.execute()).thenReturn(getJob);
-
-    // Mock table schema fetch.
-    when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
-
-    // Mock table data fetch.
-    when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 42)));
-
-    // Run query and verify
-    String query = "SELECT name, count from table";
-    try (BigQueryTableRowIterator iterator =
-            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
-      iterator.open();
-      assertTrue(iterator.advance());
-      TableRow row = iterator.getCurrent();
-
-      assertTrue(row.containsKey("name"));
-      assertTrue(row.containsKey("answer"));
-      assertEquals("Arthur", row.get("name"));
-      assertEquals(42, row.get("answer"));
-
-      assertFalse(iterator.advance());
-    }
-
-    // Temp dataset created and later deleted.
-    verify(mockClient, times(2)).datasets();
-    verify(mockDatasets).insert(anyString(), any(Dataset.class));
-    verify(mockDatasetsInsert).execute();
-    verify(mockDatasets).delete(anyString(), anyString());
-    verify(mockDatasetsDelete).execute();
-    // Job inserted to run the query, polled once.
-    verify(mockClient, times(2)).jobs();
-    verify(mockJobs).insert(anyString(), any(Job.class));
-    verify(mockJobsInsert).execute();
-    verify(mockJobs).get(anyString(), anyString());
-    verify(mockJobsGet).execute();
-    // Temp table get after query finish, deleted after reading.
-    verify(mockClient, times(2)).tables();
-    verify(mockTables).get("project", "dataset", "table");
-    verify(mockTablesGet).execute();
-    verify(mockTables).delete(anyString(), anyString(), anyString());
-    verify(mockTablesDelete).execute();
-    // Table data read.
-    verify(mockClient).tabledata();
-    verify(mockTabledata).list("project", "dataset", "table");
-    verify(mockTabledataList).execute();
-  }
-
-  /**
-   * Verifies that when the query fails, the user gets a useful exception and the temporary dataset
-   * is cleaned up. Also verifies that the temporary table (which is never created) is not
-   * erroneously attempted to be deleted.
-   */
-  @Test
-  public void testQueryFailed() throws IOException {
-    // Job can be created.
-    JobReference ref = new JobReference();
-    Job insertedJob = new Job().setJobReference(ref);
-    when(mockJobsInsert.execute()).thenReturn(insertedJob);
-
-    // Job state polled with an error.
-    String errorReason = "bad query";
-    JobStatus status =
-        new JobStatus().setState("DONE").setErrorResult(new ErrorProto().setMessage(errorReason));
-    Job getJob = new Job().setJobReference(ref).setStatus(status);
-    when(mockJobsGet.execute()).thenReturn(getJob);
-
-    String query = "NOT A QUERY";
-    try (BigQueryTableRowIterator iterator =
-            BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
-      try {
-        iterator.open();
-        fail();
-      } catch (Exception expected) {
-        // Verify message explains cause and reports the query.
-        assertThat(expected.getMessage(), containsString("failed"));
-        assertThat(expected.getMessage(), containsString(errorReason));
-        assertThat(expected.getMessage(), containsString(query));
-      }
-    }
-
-    // Temp dataset created and then later deleted.
-    verify(mockClient, times(2)).datasets();
-    verify(mockDatasets).insert(anyString(), any(Dataset.class));
-    verify(mockDatasetsInsert).execute();
-    verify(mockDatasets).delete(anyString(), anyString());
-    verify(mockDatasetsDelete).execute();
-    // Job inserted to run the query, then polled once.
-    verify(mockClient, times(2)).jobs();
-    verify(mockJobs).insert(anyString(), any(Job.class));
-    verify(mockJobsInsert).execute();
-    verify(mockJobs).get(anyString(), anyString());
-    verify(mockJobsGet).execute();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
deleted file mode 100644
index c033a7d..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Tests for util classes related to BigQuery.
- */
-@RunWith(JUnit4.class)
-public class BigQueryUtilTest {
-
-  @Rule
-  public ExpectedException thrown = ExpectedException.none();
-
-  @Mock private Bigquery mockClient;
-  @Mock private Bigquery.Tables mockTables;
-  @Mock private Bigquery.Tables.Get mockTablesGet;
-  @Mock private Bigquery.Tabledata mockTabledata;
-  @Mock private Bigquery.Tabledata.List mockTabledataList;
-  private PipelineOptions options;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-    this.options = PipelineOptionsFactory.create();
-  }
-
-  @After
-  public void tearDown() {
-    verifyNoMoreInteractions(mockClient);
-    verifyNoMoreInteractions(mockTables);
-    verifyNoMoreInteractions(mockTablesGet);
-    verifyNoMoreInteractions(mockTabledata);
-    verifyNoMoreInteractions(mockTabledataList);
-  }
-
-  private void onInsertAll(List<List<Long>> errorIndicesSequence) throws Exception {
-    when(mockClient.tabledata())
-        .thenReturn(mockTabledata);
-
-    final List<TableDataInsertAllResponse> responses = new ArrayList<>();
-    for (List<Long> errorIndices : errorIndicesSequence) {
-      List<TableDataInsertAllResponse.InsertErrors> errors = new ArrayList<>();
-      for (long i : errorIndices) {
-        TableDataInsertAllResponse.InsertErrors error =
-            new TableDataInsertAllResponse.InsertErrors();
-        error.setIndex(i);
-      }
-      TableDataInsertAllResponse response = new TableDataInsertAllResponse();
-      response.setInsertErrors(errors);
-      responses.add(response);
-    }
-
-    doAnswer(
-        new Answer<Bigquery.Tabledata.InsertAll>() {
-          @Override
-          public Bigquery.Tabledata.InsertAll answer(InvocationOnMock invocation) throws Throwable {
-            Bigquery.Tabledata.InsertAll mockInsertAll = mock(Bigquery.Tabledata.InsertAll.class);
-            when(mockInsertAll.execute())
-                .thenReturn(responses.get(0),
-                    responses.subList(1, responses.size()).toArray(
-                        new TableDataInsertAllResponse[responses.size() - 1]));
-            return mockInsertAll;
-          }
-        })
-        .when(mockTabledata)
-        .insertAll(anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class));
-  }
-
-  private void verifyInsertAll(int expectedRetries) throws IOException {
-    verify(mockClient, times(expectedRetries)).tabledata();
-    verify(mockTabledata, times(expectedRetries))
-        .insertAll(anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class));
-  }
-
-  private void onTableGet(Table table) throws IOException {
-    when(mockClient.tables())
-        .thenReturn(mockTables);
-    when(mockTables.get(anyString(), anyString(), anyString()))
-        .thenReturn(mockTablesGet);
-    when(mockTablesGet.execute())
-        .thenReturn(table);
-  }
-
-  private void verifyTableGet() throws IOException {
-    verify(mockClient).tables();
-    verify(mockTables).get("project", "dataset", "table");
-    verify(mockTablesGet, atLeastOnce()).execute();
-  }
-
-  private void onTableList(TableDataList result) throws IOException {
-    when(mockClient.tabledata())
-        .thenReturn(mockTabledata);
-    when(mockTabledata.list(anyString(), anyString(), anyString()))
-        .thenReturn(mockTabledataList);
-    when(mockTabledataList.execute())
-        .thenReturn(result);
-  }
-
-  private void verifyTabledataList() throws IOException {
-    verify(mockClient, atLeastOnce()).tabledata();
-    verify(mockTabledata, atLeastOnce()).list("project", "dataset", "table");
-    verify(mockTabledataList, atLeastOnce()).execute();
-    // Max results may be set when testing for an empty table.
-    verify(mockTabledataList, atLeast(0)).setMaxResults(anyLong());
-  }
-
-  private Table basicTableSchema() {
-    return new Table()
-        .setSchema(new TableSchema()
-            .setFields(Arrays.asList(
-                new TableFieldSchema()
-                    .setName("name")
-                    .setType("STRING"),
-                new TableFieldSchema()
-                    .setName("answer")
-                    .setType("INTEGER")
-            )));
-  }
-
-  private Table basicTableSchemaWithTime() {
-    return new Table()
-        .setSchema(new TableSchema()
-            .setFields(Arrays.asList(
-                new TableFieldSchema()
-                    .setName("time")
-                    .setType("TIMESTAMP")
-            )));
-  }
-
-  @Test
-  public void testReadWithTime() throws IOException, InterruptedException {
-    // The BigQuery JSON API returns timestamps in the following format: floating-point seconds
-    // since epoch (UTC) with microsecond precision. Test that we faithfully preserve a set of
-    // known values.
-    TableDataList input = rawDataList(
-        rawRow("1.430397296789E9"),
-        rawRow("1.45206228E9"),
-        rawRow("1.452062291E9"),
-        rawRow("1.4520622911E9"),
-        rawRow("1.45206229112E9"),
-        rawRow("1.452062291123E9"),
-        rawRow("1.4520622911234E9"),
-        rawRow("1.45206229112345E9"),
-        rawRow("1.452062291123456E9"));
-    onTableGet(basicTableSchemaWithTime());
-    onTableList(input);
-
-    // Known results verified from BigQuery's export to JSON on GCS API.
-    List<String> expected = ImmutableList.of(
-        "2015-04-30 12:34:56.789 UTC",
-        "2016-01-06 06:38:00 UTC",
-        "2016-01-06 06:38:11 UTC",
-        "2016-01-06 06:38:11.1 UTC",
-        "2016-01-06 06:38:11.12 UTC",
-        "2016-01-06 06:38:11.123 UTC",
-        "2016-01-06 06:38:11.1234 UTC",
-        "2016-01-06 06:38:11.12345 UTC",
-        "2016-01-06 06:38:11.123456 UTC");
-
-    // Download the rows, verify the interactions.
-    List<TableRow> rows = new ArrayList<>();
-    try (BigQueryTableRowIterator iterator =
-            BigQueryTableRowIterator.fromTable(
-                BigQueryIO.parseTableSpec("project:dataset.table"), mockClient)) {
-      iterator.open();
-      while (iterator.advance()) {
-        rows.add(iterator.getCurrent());
-      }
-    }
-    verifyTableGet();
-    verifyTabledataList();
-
-    // Verify the timestamp converted as desired.
-    assertEquals("Expected input and output rows to have the same size",
-        expected.size(), rows.size());
-    for (int i = 0; i < expected.size(); ++i) {
-      assertEquals("i=" + i, expected.get(i), rows.get(i).get("time"));
-    }
-
-  }
-
-  private TableRow rawRow(Object...args) {
-    List<TableCell> cells = new LinkedList<>();
-    for (Object a : args) {
-      cells.add(new TableCell().setV(a));
-    }
-    return new TableRow().setF(cells);
-  }
-
-  private TableDataList rawDataList(TableRow...rows) {
-    return new TableDataList()
-        .setRows(Arrays.asList(rows));
-  }
-
-  @Test
-  public void testRead() throws IOException, InterruptedException {
-    onTableGet(basicTableSchema());
-
-    TableDataList dataList = rawDataList(rawRow("Arthur", 42));
-    onTableList(dataList);
-
-    try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
-        mockClient)) {
-      iterator.open();
-      Assert.assertTrue(iterator.advance());
-      TableRow row = iterator.getCurrent();
-
-      Assert.assertTrue(row.containsKey("name"));
-      Assert.assertTrue(row.containsKey("answer"));
-      Assert.assertEquals("Arthur", row.get("name"));
-      Assert.assertEquals(42, row.get("answer"));
-
-      Assert.assertFalse(iterator.advance());
-
-      verifyTableGet();
-      verifyTabledataList();
-    }
-  }
-
-  @Test
-  public void testReadEmpty() throws IOException, InterruptedException {
-    onTableGet(basicTableSchema());
-
-    // BigQuery may respond with a page token for an empty table, ensure we
-    // handle it.
-    TableDataList dataList = new TableDataList()
-        .setPageToken("FEED==")
-        .setTotalRows(0L);
-    onTableList(dataList);
-
-    try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
-        mockClient)) {
-      iterator.open();
-
-      Assert.assertFalse(iterator.advance());
-
-      verifyTableGet();
-      verifyTabledataList();
-    }
-  }
-
-  @Test
-  public void testReadMultiPage() throws IOException, InterruptedException {
-    onTableGet(basicTableSchema());
-
-    TableDataList page1 = rawDataList(rawRow("Row1", 1))
-        .setPageToken("page2");
-    TableDataList page2 = rawDataList(rawRow("Row2", 2))
-        .setTotalRows(2L);
-
-    when(mockClient.tabledata())
-        .thenReturn(mockTabledata);
-    when(mockTabledata.list(anyString(), anyString(), anyString()))
-        .thenReturn(mockTabledataList);
-    when(mockTabledataList.execute())
-        .thenReturn(page1)
-        .thenReturn(page2);
-
-    try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
-        mockClient)) {
-      iterator.open();
-
-      List<String> names = new LinkedList<>();
-      while (iterator.advance()) {
-        names.add((String) iterator.getCurrent().get("name"));
-      }
-
-      Assert.assertThat(names, Matchers.hasItems("Row1", "Row2"));
-
-      verifyTableGet();
-      verifyTabledataList();
-      // The second call should have used a page token.
-      verify(mockTabledataList).setPageToken("page2");
-    }
-  }
-
-  @Test
-  public void testReadOpenFailure() throws IOException, InterruptedException {
-    thrown.expect(IOException.class);
-
-    when(mockClient.tables())
-        .thenReturn(mockTables);
-    when(mockTables.get(anyString(), anyString(), anyString()))
-        .thenReturn(mockTablesGet);
-    when(mockTablesGet.execute())
-        .thenThrow(new IOException("No such table"));
-
-    try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
-        BigQueryIO.parseTableSpec("project:dataset.table"),
-        mockClient)) {
-      try {
-        iterator.open(); // throws.
-      } finally {
-        verifyTableGet();
-      }
-    }
-  }
-
-  @Test
-  public void testWriteAppend() throws IOException {
-    onTableGet(basicTableSchema());
-
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
-    inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND,
-        BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-
-    verifyTableGet();
-  }
-
-  @Test
-  public void testWriteEmpty() throws IOException {
-    onTableGet(basicTableSchema());
-
-    TableDataList dataList = new TableDataList().setTotalRows(0L);
-    onTableList(dataList);
-
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
-    inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
-        BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-
-    verifyTableGet();
-    verifyTabledataList();
-  }
-
-  @Test
-  public void testWriteEmptyFail() throws IOException {
-    thrown.expect(IOException.class);
-
-    onTableGet(basicTableSchema());
-
-    TableDataList dataList = rawDataList(rawRow("Arthur", 42));
-    onTableList(dataList);
-
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
-    try {
-      inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
-          BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-    } finally {
-      verifyTableGet();
-      verifyTabledataList();
-    }
-  }
-
-  @Test
-  public void testInsertAll() throws Exception, IOException {
-    // Build up a list of indices to fail on each invocation. This should result in
-    // 5 calls to insertAll.
-    List<List<Long>> errorsIndices = new ArrayList<>();
-    errorsIndices.add(Arrays.asList(0L, 5L, 10L, 15L, 20L));
-    errorsIndices.add(Arrays.asList(0L, 2L, 4L));
-    errorsIndices.add(Arrays.asList(0L, 2L));
-    errorsIndices.add(new ArrayList<Long>());
-    onInsertAll(errorsIndices);
-
-    TableReference ref = BigQueryIO
-        .parseTableSpec("project:dataset.table");
-    BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5);
-
-    List<TableRow> rows = new ArrayList<>();
-    List<String> ids = new ArrayList<>();
-    for (int i = 0; i < 25; ++i) {
-      rows.add(rawRow("foo", 1234));
-      ids.add(new String());
-    }
-
-    InMemoryLongSumAggregator byteCountAggregator = new InMemoryLongSumAggregator("ByteCount");
-    try {
-      inserter.insertAll(ref, rows, ids, byteCountAggregator);
-    } finally {
-      verifyInsertAll(5);
-      // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
-      assertEquals("Incorrect byte count", 25L * 23L, byteCountAggregator.getSum());
-    }
-  }
-
-  private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
-    private final String name;
-    private long sum = 0;
-
-    public InMemoryLongSumAggregator(String name) {
-      this.name = name;
-    }
-
-    @Override
-    public void addValue(Long value) {
-      sum += value;
-    }
-
-    @Override
-    public String getName() {
-      return name;
-    }
-
-    @Override
-    public CombineFn<Long, ?, Long> getCombineFn() {
-      return new Sum.SumLongFn();
-    }
-
-    public long getSum() {
-      return sum;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
index 83ffaa1..91d74db 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -30,8 +30,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
 import com.google.api.client.auth.oauth2.Credential;
 import com.google.api.client.http.HttpRequest;
 import com.google.api.client.http.HttpResponse;
@@ -46,11 +44,8 @@ import com.google.api.client.testing.http.MockHttpTransport;
 import com.google.api.client.testing.http.MockLowLevelHttpRequest;
 import com.google.api.client.util.NanoClock;
 import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.storage.Storage;
-import com.google.common.collect.ImmutableList;
+import com.google.api.services.storage.Storage.Objects.Get;
 
 import org.hamcrest.Matchers;
 import org.junit.After;
@@ -279,21 +274,18 @@ public class RetryHttpRequestInitializerTest {
           }
         }).build();
 
-    // A sample HTTP request to BigQuery that uses both default Transport and default
+    // A sample HTTP request to Google Cloud Storage that uses both default Transport and default
     // RetryHttpInitializer.
-    Bigquery b = new Bigquery.Builder(
+    Storage storage = new Storage.Builder(
         transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
 
-    BigQueryTableInserter inserter = new BigQueryTableInserter(b, PipelineOptionsFactory.create());
-    TableReference t = new TableReference()
-        .setProjectId("project").setDatasetId("dataset").setTableId("table");
+    Get getRequest = storage.objects().get("gs://fake", "file");
 
     try {
-      inserter.insertAll(t, ImmutableList.of(new TableRow()));
+      getRequest.execute();
       fail();
     } catch (Throwable e) {
-      assertThat(e, Matchers.<Throwable>instanceOf(RuntimeException.class));
-      assertThat(e.getCause(), Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
+      assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
       assertEquals(1 + defaultNumberOfRetries, executeCount.get());
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index b86f3b9..4ee83cf 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -107,6 +107,21 @@
     <!-- Build dependencies -->
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.apis</groupId>
+      <artifactId>google-api-services-bigquery</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.cloud.bigdataoss</groupId>
+      <artifactId>util</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.cloud.datastore</groupId>
       <artifactId>datastore-v1beta3-proto-client</artifactId>
     </dependency>
@@ -122,6 +137,11 @@
     </dependency>
 
     <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.cloud.bigtable</groupId>
       <artifactId>bigtable-protos</artifactId>
       <version>${bigtable.version}</version>
@@ -146,11 +166,21 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.api-client</groupId>
+      <artifactId>google-api-client</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.http-client</groupId>
       <artifactId>google-http-client</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.google.http-client</groupId>
+      <artifactId>google-http-client-jackson2</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.oauth-client</groupId>
       <artifactId>google-oauth-client</artifactId>
     </dependency>
@@ -183,6 +213,11 @@
       <scope>runtime</scope>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+
     <!--  test -->
     <dependency>
       <groupId>org.apache.beam</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
new file mode 100644
index 0000000..48e2258
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * A set of utilities for working with Avro files.
+ *
+ * <p>These utilities are based on the <a
+ * href="https://avro.apache.org/docs/1.8.1/spec.html">Avro 1.8.1</a> specification.
+ */
+class BigQueryAvroUtils {
+
+  /**
+   * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
+   * immutable.
+   */
+  private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
+      DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
+  // Package private for BigQueryTableRowIterator to use.
+  static String formatTimestamp(String timestamp) {
+    // timestamp is in "seconds since epoch" format, with scientific notation.
+    // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
+    // Separate into seconds and microseconds.
+    double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
+    long timestampMicros = (long) timestampDoubleMicros;
+    long seconds = timestampMicros / 1000000;
+    int micros = (int) (timestampMicros % 1000000);
+    String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
+
+    // No sub-second component.
+    if (micros == 0) {
+      return String.format("%s UTC", dayAndTime);
+    }
+
+    // Sub-second component.
+    int digits = 6;
+    int subsecond = micros;
+    while (subsecond % 10 == 0) {
+      digits--;
+      subsecond /= 10;
+    }
+    String formatString = String.format("%%0%dd", digits);
+    String fractionalSeconds = String.format(formatString, subsecond);
+    return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
+  }
+
+  /**
+   * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
+   *
+   * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">
+   * "Avro format"</a> for more information.
+   */
+  static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
+    return convertGenericRecordToTableRow(record, schema.getFields());
+  }
+
+  private static TableRow convertGenericRecordToTableRow(
+      GenericRecord record, List<TableFieldSchema> fields) {
+    TableRow row = new TableRow();
+    for (TableFieldSchema subSchema : fields) {
+      // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
+      // is required, so it may not be null.
+      Field field = record.getSchema().getField(subSchema.getName());
+      Object convertedValue =
+          getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
+      if (convertedValue != null) {
+        // To match the JSON files exported by BigQuery, do not include null values in the output.
+        row.set(field.name(), convertedValue);
+      }
+    }
+    return row;
+  }
+
+  @Nullable
+  private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
+    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
+    // is optional (and so it may be null), but defaults to "NULLABLE".
+    String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
+    switch (mode) {
+      case "REQUIRED":
+        return convertRequiredField(schema.getType(), fieldSchema, v);
+      case "REPEATED":
+        return convertRepeatedField(schema, fieldSchema, v);
+      case "NULLABLE":
+        return convertNullableField(schema, fieldSchema, v);
+      default:
+        throw new UnsupportedOperationException(
+            "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
+    }
+  }
+
+  private static List<Object> convertRepeatedField(
+      Schema schema, TableFieldSchema fieldSchema, Object v) {
+    Type arrayType = schema.getType();
+    verify(
+        arrayType == Type.ARRAY,
+        "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
+        fieldSchema.getName(),
+        arrayType);
+    // REPEATED fields are represented as Avro arrays.
+    if (v == null) {
+      // Handle the case of an empty repeated field.
+      return ImmutableList.of();
+    }
+    @SuppressWarnings("unchecked")
+    List<Object> elements = (List<Object>) v;
+    ImmutableList.Builder<Object> values = ImmutableList.builder();
+    Type elementType = schema.getElementType().getType();
+    for (Object element : elements) {
+      values.add(convertRequiredField(elementType, fieldSchema, element));
+    }
+    return values.build();
+  }
+
+  private static Object convertRequiredField(
+      Type avroType, TableFieldSchema fieldSchema, Object v) {
+    // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
+    // INTEGER type maps to an Avro LONG type.
+    checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
+    ImmutableMap<String, Type> fieldMap =
+        ImmutableMap.<String, Type>builder()
+            .put("STRING", Type.STRING)
+            .put("INTEGER", Type.LONG)
+            .put("FLOAT", Type.DOUBLE)
+            .put("BOOLEAN", Type.BOOLEAN)
+            .put("TIMESTAMP", Type.LONG)
+            .put("RECORD", Type.RECORD)
+            .build();
+    // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
+    // is required, so it may not be null.
+    String bqType = fieldSchema.getType();
+    Type expectedAvroType = fieldMap.get(bqType);
+    verify(
+        avroType == expectedAvroType,
+        "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
+        expectedAvroType,
+        avroType,
+        bqType,
+        fieldSchema.getName());
+    switch (fieldSchema.getType()) {
+      case "STRING":
+        // Avro will use a CharSequence to represent String objects, but it may not always use
+        // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
+        verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
+        return v.toString();
+      case "INTEGER":
+        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
+        return ((Long) v).toString();
+      case "FLOAT":
+        verify(v instanceof Double, "Expected Double, got %s", v.getClass());
+        return v;
+      case "BOOLEAN":
+        verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
+        return v;
+      case "TIMESTAMP":
+        // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
+        // Strings with variable-precision (up to six digits) to match the JSON files export
+        // by BigQuery.
+        verify(v instanceof Long, "Expected Long, got %s", v.getClass());
+        Double doubleValue = ((Long) v) / 1000000.0;
+        return formatTimestamp(doubleValue.toString());
+      case "RECORD":
+        verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
+        return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
+      default:
+        throw new UnsupportedOperationException(
+            String.format(
+                "Unexpected BigQuery field schema type %s for field named %s",
+                fieldSchema.getType(),
+                fieldSchema.getName()));
+    }
+  }
+
+  @Nullable
+  private static Object convertNullableField(
+      Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
+    // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
+    verify(
+        avroSchema.getType() == Type.UNION,
+        "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
+        avroSchema.getType(),
+        fieldSchema.getName());
+    List<Schema> unionTypes = avroSchema.getTypes();
+    verify(
+        unionTypes.size() == 2,
+        "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
+        fieldSchema.getName(),
+        unionTypes);
+
+    if (v == null) {
+      return null;
+    }
+
+    Type firstType = unionTypes.get(0).getType();
+    if (!firstType.equals(Type.NULL)) {
+      return convertRequiredField(firstType, fieldSchema, v);
+    }
+    return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
+  }
+}