You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:03:00 UTC
[05/10] incubator-beam git commit: BigQueryIO: move to
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java
deleted file mode 100644
index e92f2c6..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryTableRowIteratorTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.containsString;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Dataset;
-import com.google.api.services.bigquery.model.ErrorProto;
-import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfiguration;
-import com.google.api.services.bigquery.model.JobConfigurationQuery;
-import com.google.api.services.bigquery.model.JobReference;
-import com.google.api.services.bigquery.model.JobStatus;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Tests for {@link BigQueryTableRowIterator}.
- */
-@RunWith(JUnit4.class)
-public class BigQueryTableRowIteratorTest {
-
- @Rule public ExpectedException thrown = ExpectedException.none();
-
- @Mock private Bigquery mockClient;
- @Mock private Bigquery.Datasets mockDatasets;
- @Mock private Bigquery.Datasets.Delete mockDatasetsDelete;
- @Mock private Bigquery.Datasets.Insert mockDatasetsInsert;
- @Mock private Bigquery.Jobs mockJobs;
- @Mock private Bigquery.Jobs.Get mockJobsGet;
- @Mock private Bigquery.Jobs.Insert mockJobsInsert;
- @Mock private Bigquery.Tables mockTables;
- @Mock private Bigquery.Tables.Get mockTablesGet;
- @Mock private Bigquery.Tables.Delete mockTablesDelete;
- @Mock private Bigquery.Tabledata mockTabledata;
- @Mock private Bigquery.Tabledata.List mockTabledataList;
-
- @Before
- public void setUp() throws IOException {
- MockitoAnnotations.initMocks(this);
- when(mockClient.tabledata()).thenReturn(mockTabledata);
- when(mockTabledata.list(anyString(), anyString(), anyString())).thenReturn(mockTabledataList);
-
- when(mockClient.tables()).thenReturn(mockTables);
- when(mockTables.delete(anyString(), anyString(), anyString())).thenReturn(mockTablesDelete);
- when(mockTables.get(anyString(), anyString(), anyString())).thenReturn(mockTablesGet);
-
- when(mockClient.datasets()).thenReturn(mockDatasets);
- when(mockDatasets.delete(anyString(), anyString())).thenReturn(mockDatasetsDelete);
- when(mockDatasets.insert(anyString(), any(Dataset.class))).thenReturn(mockDatasetsInsert);
-
- when(mockClient.jobs()).thenReturn(mockJobs);
- when(mockJobs.insert(anyString(), any(Job.class))).thenReturn(mockJobsInsert);
- when(mockJobs.get(anyString(), anyString())).thenReturn(mockJobsGet);
- }
-
- @After
- public void tearDown() {
- verifyNoMoreInteractions(mockClient);
- verifyNoMoreInteractions(mockDatasets);
- verifyNoMoreInteractions(mockDatasetsDelete);
- verifyNoMoreInteractions(mockDatasetsInsert);
- verifyNoMoreInteractions(mockJobs);
- verifyNoMoreInteractions(mockJobsGet);
- verifyNoMoreInteractions(mockJobsInsert);
- verifyNoMoreInteractions(mockTables);
- verifyNoMoreInteractions(mockTablesDelete);
- verifyNoMoreInteractions(mockTablesGet);
- verifyNoMoreInteractions(mockTabledata);
- verifyNoMoreInteractions(mockTabledataList);
- }
-
- private static Table tableWithBasicSchema() {
- return new Table()
- .setSchema(
- new TableSchema()
- .setFields(
- Arrays.asList(
- new TableFieldSchema().setName("name").setType("STRING"),
- new TableFieldSchema().setName("answer").setType("INTEGER"))));
- }
-
- private TableRow rawRow(Object... args) {
- List<TableCell> cells = new LinkedList<>();
- for (Object a : args) {
- cells.add(new TableCell().setV(a));
- }
- return new TableRow().setF(cells);
- }
-
- private TableDataList rawDataList(TableRow... rows) {
- return new TableDataList().setRows(Arrays.asList(rows));
- }
-
- /**
- * Verifies that when the query runs, the correct data is returned and the temporary dataset and
- * table are both cleaned up.
- */
- @Test
- public void testReadFromQuery() throws IOException, InterruptedException {
- // Mock job inserting.
- Job insertedJob = new Job().setJobReference(new JobReference());
- when(mockJobsInsert.execute()).thenReturn(insertedJob);
-
- // Mock job polling.
- JobStatus status = new JobStatus().setState("DONE");
- TableReference tableRef =
- new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
- JobConfigurationQuery queryConfig = new JobConfigurationQuery().setDestinationTable(tableRef);
- Job getJob =
- new Job()
- .setJobReference(new JobReference())
- .setStatus(status)
- .setConfiguration(new JobConfiguration().setQuery(queryConfig));
- when(mockJobsGet.execute()).thenReturn(getJob);
-
- // Mock table schema fetch.
- when(mockTablesGet.execute()).thenReturn(tableWithBasicSchema());
-
- // Mock table data fetch.
- when(mockTabledataList.execute()).thenReturn(rawDataList(rawRow("Arthur", 42)));
-
- // Run query and verify
- String query = "SELECT name, count from table";
- try (BigQueryTableRowIterator iterator =
- BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
- iterator.open();
- assertTrue(iterator.advance());
- TableRow row = iterator.getCurrent();
-
- assertTrue(row.containsKey("name"));
- assertTrue(row.containsKey("answer"));
- assertEquals("Arthur", row.get("name"));
- assertEquals(42, row.get("answer"));
-
- assertFalse(iterator.advance());
- }
-
- // Temp dataset created and later deleted.
- verify(mockClient, times(2)).datasets();
- verify(mockDatasets).insert(anyString(), any(Dataset.class));
- verify(mockDatasetsInsert).execute();
- verify(mockDatasets).delete(anyString(), anyString());
- verify(mockDatasetsDelete).execute();
- // Job inserted to run the query, polled once.
- verify(mockClient, times(2)).jobs();
- verify(mockJobs).insert(anyString(), any(Job.class));
- verify(mockJobsInsert).execute();
- verify(mockJobs).get(anyString(), anyString());
- verify(mockJobsGet).execute();
- // Temp table get after query finish, deleted after reading.
- verify(mockClient, times(2)).tables();
- verify(mockTables).get("project", "dataset", "table");
- verify(mockTablesGet).execute();
- verify(mockTables).delete(anyString(), anyString(), anyString());
- verify(mockTablesDelete).execute();
- // Table data read.
- verify(mockClient).tabledata();
- verify(mockTabledata).list("project", "dataset", "table");
- verify(mockTabledataList).execute();
- }
-
- /**
- * Verifies that when the query fails, the user gets a useful exception and the temporary dataset
- * is cleaned up. Also verifies that the temporary table (which is never created) is not
- * erroneously attempted to be deleted.
- */
- @Test
- public void testQueryFailed() throws IOException {
- // Job can be created.
- JobReference ref = new JobReference();
- Job insertedJob = new Job().setJobReference(ref);
- when(mockJobsInsert.execute()).thenReturn(insertedJob);
-
- // Job state polled with an error.
- String errorReason = "bad query";
- JobStatus status =
- new JobStatus().setState("DONE").setErrorResult(new ErrorProto().setMessage(errorReason));
- Job getJob = new Job().setJobReference(ref).setStatus(status);
- when(mockJobsGet.execute()).thenReturn(getJob);
-
- String query = "NOT A QUERY";
- try (BigQueryTableRowIterator iterator =
- BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
- try {
- iterator.open();
- fail();
- } catch (Exception expected) {
- // Verify message explains cause and reports the query.
- assertThat(expected.getMessage(), containsString("failed"));
- assertThat(expected.getMessage(), containsString(errorReason));
- assertThat(expected.getMessage(), containsString(query));
- }
- }
-
- // Temp dataset created and then later deleted.
- verify(mockClient, times(2)).datasets();
- verify(mockDatasets).insert(anyString(), any(Dataset.class));
- verify(mockDatasetsInsert).execute();
- verify(mockDatasets).delete(anyString(), anyString());
- verify(mockDatasetsDelete).execute();
- // Job inserted to run the query, then polled once.
- verify(mockClient, times(2)).jobs();
- verify(mockJobs).insert(anyString(), any(Job.class));
- verify(mockJobsInsert).execute();
- verify(mockJobs).get(anyString(), anyString());
- verify(mockJobsGet).execute();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
deleted file mode 100644
index c033a7d..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryUtilTest.java
+++ /dev/null
@@ -1,485 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.BigQueryIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Sum;
-
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.Table;
-import com.google.api.services.bigquery.model.TableCell;
-import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
-import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
-import com.google.api.services.bigquery.model.TableDataList;
-import com.google.api.services.bigquery.model.TableFieldSchema;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
-import com.google.api.services.bigquery.model.TableSchema;
-import com.google.common.collect.ImmutableList;
-
-import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Tests for util classes related to BigQuery.
- */
-@RunWith(JUnit4.class)
-public class BigQueryUtilTest {
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Mock private Bigquery mockClient;
- @Mock private Bigquery.Tables mockTables;
- @Mock private Bigquery.Tables.Get mockTablesGet;
- @Mock private Bigquery.Tabledata mockTabledata;
- @Mock private Bigquery.Tabledata.List mockTabledataList;
- private PipelineOptions options;
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
- this.options = PipelineOptionsFactory.create();
- }
-
- @After
- public void tearDown() {
- verifyNoMoreInteractions(mockClient);
- verifyNoMoreInteractions(mockTables);
- verifyNoMoreInteractions(mockTablesGet);
- verifyNoMoreInteractions(mockTabledata);
- verifyNoMoreInteractions(mockTabledataList);
- }
-
- private void onInsertAll(List<List<Long>> errorIndicesSequence) throws Exception {
- when(mockClient.tabledata())
- .thenReturn(mockTabledata);
-
- final List<TableDataInsertAllResponse> responses = new ArrayList<>();
- for (List<Long> errorIndices : errorIndicesSequence) {
- List<TableDataInsertAllResponse.InsertErrors> errors = new ArrayList<>();
- for (long i : errorIndices) {
- TableDataInsertAllResponse.InsertErrors error =
- new TableDataInsertAllResponse.InsertErrors();
- error.setIndex(i);
- }
- TableDataInsertAllResponse response = new TableDataInsertAllResponse();
- response.setInsertErrors(errors);
- responses.add(response);
- }
-
- doAnswer(
- new Answer<Bigquery.Tabledata.InsertAll>() {
- @Override
- public Bigquery.Tabledata.InsertAll answer(InvocationOnMock invocation) throws Throwable {
- Bigquery.Tabledata.InsertAll mockInsertAll = mock(Bigquery.Tabledata.InsertAll.class);
- when(mockInsertAll.execute())
- .thenReturn(responses.get(0),
- responses.subList(1, responses.size()).toArray(
- new TableDataInsertAllResponse[responses.size() - 1]));
- return mockInsertAll;
- }
- })
- .when(mockTabledata)
- .insertAll(anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class));
- }
-
- private void verifyInsertAll(int expectedRetries) throws IOException {
- verify(mockClient, times(expectedRetries)).tabledata();
- verify(mockTabledata, times(expectedRetries))
- .insertAll(anyString(), anyString(), anyString(), any(TableDataInsertAllRequest.class));
- }
-
- private void onTableGet(Table table) throws IOException {
- when(mockClient.tables())
- .thenReturn(mockTables);
- when(mockTables.get(anyString(), anyString(), anyString()))
- .thenReturn(mockTablesGet);
- when(mockTablesGet.execute())
- .thenReturn(table);
- }
-
- private void verifyTableGet() throws IOException {
- verify(mockClient).tables();
- verify(mockTables).get("project", "dataset", "table");
- verify(mockTablesGet, atLeastOnce()).execute();
- }
-
- private void onTableList(TableDataList result) throws IOException {
- when(mockClient.tabledata())
- .thenReturn(mockTabledata);
- when(mockTabledata.list(anyString(), anyString(), anyString()))
- .thenReturn(mockTabledataList);
- when(mockTabledataList.execute())
- .thenReturn(result);
- }
-
- private void verifyTabledataList() throws IOException {
- verify(mockClient, atLeastOnce()).tabledata();
- verify(mockTabledata, atLeastOnce()).list("project", "dataset", "table");
- verify(mockTabledataList, atLeastOnce()).execute();
- // Max results may be set when testing for an empty table.
- verify(mockTabledataList, atLeast(0)).setMaxResults(anyLong());
- }
-
- private Table basicTableSchema() {
- return new Table()
- .setSchema(new TableSchema()
- .setFields(Arrays.asList(
- new TableFieldSchema()
- .setName("name")
- .setType("STRING"),
- new TableFieldSchema()
- .setName("answer")
- .setType("INTEGER")
- )));
- }
-
- private Table basicTableSchemaWithTime() {
- return new Table()
- .setSchema(new TableSchema()
- .setFields(Arrays.asList(
- new TableFieldSchema()
- .setName("time")
- .setType("TIMESTAMP")
- )));
- }
-
- @Test
- public void testReadWithTime() throws IOException, InterruptedException {
- // The BigQuery JSON API returns timestamps in the following format: floating-point seconds
- // since epoch (UTC) with microsecond precision. Test that we faithfully preserve a set of
- // known values.
- TableDataList input = rawDataList(
- rawRow("1.430397296789E9"),
- rawRow("1.45206228E9"),
- rawRow("1.452062291E9"),
- rawRow("1.4520622911E9"),
- rawRow("1.45206229112E9"),
- rawRow("1.452062291123E9"),
- rawRow("1.4520622911234E9"),
- rawRow("1.45206229112345E9"),
- rawRow("1.452062291123456E9"));
- onTableGet(basicTableSchemaWithTime());
- onTableList(input);
-
- // Known results verified from BigQuery's export to JSON on GCS API.
- List<String> expected = ImmutableList.of(
- "2015-04-30 12:34:56.789 UTC",
- "2016-01-06 06:38:00 UTC",
- "2016-01-06 06:38:11 UTC",
- "2016-01-06 06:38:11.1 UTC",
- "2016-01-06 06:38:11.12 UTC",
- "2016-01-06 06:38:11.123 UTC",
- "2016-01-06 06:38:11.1234 UTC",
- "2016-01-06 06:38:11.12345 UTC",
- "2016-01-06 06:38:11.123456 UTC");
-
- // Download the rows, verify the interactions.
- List<TableRow> rows = new ArrayList<>();
- try (BigQueryTableRowIterator iterator =
- BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"), mockClient)) {
- iterator.open();
- while (iterator.advance()) {
- rows.add(iterator.getCurrent());
- }
- }
- verifyTableGet();
- verifyTabledataList();
-
- // Verify the timestamp converted as desired.
- assertEquals("Expected input and output rows to have the same size",
- expected.size(), rows.size());
- for (int i = 0; i < expected.size(); ++i) {
- assertEquals("i=" + i, expected.get(i), rows.get(i).get("time"));
- }
-
- }
-
- private TableRow rawRow(Object...args) {
- List<TableCell> cells = new LinkedList<>();
- for (Object a : args) {
- cells.add(new TableCell().setV(a));
- }
- return new TableRow().setF(cells);
- }
-
- private TableDataList rawDataList(TableRow...rows) {
- return new TableDataList()
- .setRows(Arrays.asList(rows));
- }
-
- @Test
- public void testRead() throws IOException, InterruptedException {
- onTableGet(basicTableSchema());
-
- TableDataList dataList = rawDataList(rawRow("Arthur", 42));
- onTableList(dataList);
-
- try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
- mockClient)) {
- iterator.open();
- Assert.assertTrue(iterator.advance());
- TableRow row = iterator.getCurrent();
-
- Assert.assertTrue(row.containsKey("name"));
- Assert.assertTrue(row.containsKey("answer"));
- Assert.assertEquals("Arthur", row.get("name"));
- Assert.assertEquals(42, row.get("answer"));
-
- Assert.assertFalse(iterator.advance());
-
- verifyTableGet();
- verifyTabledataList();
- }
- }
-
- @Test
- public void testReadEmpty() throws IOException, InterruptedException {
- onTableGet(basicTableSchema());
-
- // BigQuery may respond with a page token for an empty table, ensure we
- // handle it.
- TableDataList dataList = new TableDataList()
- .setPageToken("FEED==")
- .setTotalRows(0L);
- onTableList(dataList);
-
- try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
- mockClient)) {
- iterator.open();
-
- Assert.assertFalse(iterator.advance());
-
- verifyTableGet();
- verifyTabledataList();
- }
- }
-
- @Test
- public void testReadMultiPage() throws IOException, InterruptedException {
- onTableGet(basicTableSchema());
-
- TableDataList page1 = rawDataList(rawRow("Row1", 1))
- .setPageToken("page2");
- TableDataList page2 = rawDataList(rawRow("Row2", 2))
- .setTotalRows(2L);
-
- when(mockClient.tabledata())
- .thenReturn(mockTabledata);
- when(mockTabledata.list(anyString(), anyString(), anyString()))
- .thenReturn(mockTabledataList);
- when(mockTabledataList.execute())
- .thenReturn(page1)
- .thenReturn(page2);
-
- try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
- mockClient)) {
- iterator.open();
-
- List<String> names = new LinkedList<>();
- while (iterator.advance()) {
- names.add((String) iterator.getCurrent().get("name"));
- }
-
- Assert.assertThat(names, Matchers.hasItems("Row1", "Row2"));
-
- verifyTableGet();
- verifyTabledataList();
- // The second call should have used a page token.
- verify(mockTabledataList).setPageToken("page2");
- }
- }
-
- @Test
- public void testReadOpenFailure() throws IOException, InterruptedException {
- thrown.expect(IOException.class);
-
- when(mockClient.tables())
- .thenReturn(mockTables);
- when(mockTables.get(anyString(), anyString(), anyString()))
- .thenReturn(mockTablesGet);
- when(mockTablesGet.execute())
- .thenThrow(new IOException("No such table"));
-
- try (BigQueryTableRowIterator iterator = BigQueryTableRowIterator.fromTable(
- BigQueryIO.parseTableSpec("project:dataset.table"),
- mockClient)) {
- try {
- iterator.open(); // throws.
- } finally {
- verifyTableGet();
- }
- }
- }
-
- @Test
- public void testWriteAppend() throws IOException {
- onTableGet(basicTableSchema());
-
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
- inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_APPEND,
- BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-
- verifyTableGet();
- }
-
- @Test
- public void testWriteEmpty() throws IOException {
- onTableGet(basicTableSchema());
-
- TableDataList dataList = new TableDataList().setTotalRows(0L);
- onTableList(dataList);
-
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
- inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
- BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
-
- verifyTableGet();
- verifyTabledataList();
- }
-
- @Test
- public void testWriteEmptyFail() throws IOException {
- thrown.expect(IOException.class);
-
- onTableGet(basicTableSchema());
-
- TableDataList dataList = rawDataList(rawRow("Arthur", 42));
- onTableList(dataList);
-
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
-
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options);
-
- try {
- inserter.getOrCreateTable(ref, BigQueryIO.Write.WriteDisposition.WRITE_EMPTY,
- BigQueryIO.Write.CreateDisposition.CREATE_NEVER, null);
- } finally {
- verifyTableGet();
- verifyTabledataList();
- }
- }
-
- @Test
- public void testInsertAll() throws Exception, IOException {
- // Build up a list of indices to fail on each invocation. This should result in
- // 5 calls to insertAll.
- List<List<Long>> errorsIndices = new ArrayList<>();
- errorsIndices.add(Arrays.asList(0L, 5L, 10L, 15L, 20L));
- errorsIndices.add(Arrays.asList(0L, 2L, 4L));
- errorsIndices.add(Arrays.asList(0L, 2L));
- errorsIndices.add(new ArrayList<Long>());
- onInsertAll(errorsIndices);
-
- TableReference ref = BigQueryIO
- .parseTableSpec("project:dataset.table");
- BigQueryTableInserter inserter = new BigQueryTableInserter(mockClient, options, 5);
-
- List<TableRow> rows = new ArrayList<>();
- List<String> ids = new ArrayList<>();
- for (int i = 0; i < 25; ++i) {
- rows.add(rawRow("foo", 1234));
- ids.add(new String());
- }
-
- InMemoryLongSumAggregator byteCountAggregator = new InMemoryLongSumAggregator("ByteCount");
- try {
- inserter.insertAll(ref, rows, ids, byteCountAggregator);
- } finally {
- verifyInsertAll(5);
- // Each of the 25 rows is 23 bytes: "{f=[{v=foo}, {v=1234}]}"
- assertEquals("Incorrect byte count", 25L * 23L, byteCountAggregator.getSum());
- }
- }
-
- private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> {
- private final String name;
- private long sum = 0;
-
- public InMemoryLongSumAggregator(String name) {
- this.name = name;
- }
-
- @Override
- public void addValue(Long value) {
- sum += value;
- }
-
- @Override
- public String getName() {
- return name;
- }
-
- @Override
- public CombineFn<Long, ?, Long> getCombineFn() {
- return new Sum.SumLongFn();
- }
-
- public long getSum() {
- return sum;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
index 83ffaa1..91d74db 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -30,8 +30,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-
import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
@@ -46,11 +44,8 @@ import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.util.NanoClock;
import com.google.api.client.util.Sleeper;
-import com.google.api.services.bigquery.Bigquery;
-import com.google.api.services.bigquery.model.TableReference;
-import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.storage.Storage;
-import com.google.common.collect.ImmutableList;
+import com.google.api.services.storage.Storage.Objects.Get;
import org.hamcrest.Matchers;
import org.junit.After;
@@ -279,21 +274,18 @@ public class RetryHttpRequestInitializerTest {
}
}).build();
- // A sample HTTP request to BigQuery that uses both default Transport and default
+ // A sample HTTP request to Google Cloud Storage that uses both default Transport and default
// RetryHttpInitializer.
- Bigquery b = new Bigquery.Builder(
+ Storage storage = new Storage.Builder(
transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()).build();
- BigQueryTableInserter inserter = new BigQueryTableInserter(b, PipelineOptionsFactory.create());
- TableReference t = new TableReference()
- .setProjectId("project").setDatasetId("dataset").setTableId("table");
+ Get getRequest = storage.objects().get("gs://fake", "file");
try {
- inserter.insertAll(t, ImmutableList.of(new TableRow()));
+ getRequest.execute();
fail();
} catch (Throwable e) {
- assertThat(e, Matchers.<Throwable>instanceOf(RuntimeException.class));
- assertThat(e.getCause(), Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
+ assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
assertEquals(1 + defaultNumberOfRetries, executeCount.get());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index b86f3b9..4ee83cf 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -107,6 +107,21 @@
<!-- Build dependencies -->
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-bigquery</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>util</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.cloud.datastore</groupId>
<artifactId>datastore-v1beta3-proto-client</artifactId>
</dependency>
@@ -122,6 +137,11 @@
</dependency>
<dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.cloud.bigtable</groupId>
<artifactId>bigtable-protos</artifactId>
<version>${bigtable.version}</version>
@@ -146,11 +166,21 @@
</dependency>
<dependency>
+ <groupId>com.google.api-client</groupId>
+ <artifactId>google-api-client</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.http-client</groupId>
+ <artifactId>google-http-client-jackson2</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
</dependency>
@@ -183,6 +213,11 @@
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
<!-- test -->
<dependency>
<groupId>org.apache.beam</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
new file mode 100644
index 0000000..48e2258
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtils.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+/**
+ * A set of utilities for working with Avro files.
+ *
+ * <p>These utilities are based on the <a
+ * href="https://avro.apache.org/docs/1.8.1/spec.html">Avro 1.8.1</a> specification.
+ */
+class BigQueryAvroUtils {
+
+ /**
+ * Formats BigQuery seconds-since-epoch into String matching JSON export. Thread-safe and
+ * immutable.
+ */
+ private static final DateTimeFormatter DATE_AND_SECONDS_FORMATTER =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withZoneUTC();
+ // Package private for BigQueryTableRowIterator to use.
+ static String formatTimestamp(String timestamp) {
+ // timestamp is in "seconds since epoch" format, with scientific notation.
+ // e.g., "1.45206229112345E9" to mean "2016-01-06 06:38:11.123456 UTC".
+ // Separate into seconds and microseconds.
+ double timestampDoubleMicros = Double.parseDouble(timestamp) * 1000000;
+ long timestampMicros = (long) timestampDoubleMicros;
+ long seconds = timestampMicros / 1000000;
+ int micros = (int) (timestampMicros % 1000000);
+ String dayAndTime = DATE_AND_SECONDS_FORMATTER.print(seconds * 1000);
+
+ // No sub-second component.
+ if (micros == 0) {
+ return String.format("%s UTC", dayAndTime);
+ }
+
+ // Sub-second component.
+ int digits = 6;
+ int subsecond = micros;
+ while (subsecond % 10 == 0) {
+ digits--;
+ subsecond /= 10;
+ }
+ String formatString = String.format("%%0%dd", digits);
+ String fractionalSeconds = String.format(formatString, subsecond);
+ return String.format("%s.%s UTC", dayAndTime, fractionalSeconds);
+ }
+
+ /**
+ * Utility function to convert from an Avro {@link GenericRecord} to a BigQuery {@link TableRow}.
+ *
+ * See <a href="https://cloud.google.com/bigquery/exporting-data-from-bigquery#config">
+ * "Avro format"</a> for more information.
+ */
+ static TableRow convertGenericRecordToTableRow(GenericRecord record, TableSchema schema) {
+ return convertGenericRecordToTableRow(record, schema.getFields());
+ }
+
+ private static TableRow convertGenericRecordToTableRow(
+ GenericRecord record, List<TableFieldSchema> fields) {
+ TableRow row = new TableRow();
+ for (TableFieldSchema subSchema : fields) {
+ // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the name field
+ // is required, so it may not be null.
+ Field field = record.getSchema().getField(subSchema.getName());
+ Object convertedValue =
+ getTypedCellValue(field.schema(), subSchema, record.get(field.name()));
+ if (convertedValue != null) {
+ // To match the JSON files exported by BigQuery, do not include null values in the output.
+ row.set(field.name(), convertedValue);
+ }
+ }
+ return row;
+ }
+
+ @Nullable
+ private static Object getTypedCellValue(Schema schema, TableFieldSchema fieldSchema, Object v) {
+ // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the mode field
+ // is optional (and so it may be null), but defaults to "NULLABLE".
+ String mode = firstNonNull(fieldSchema.getMode(), "NULLABLE");
+ switch (mode) {
+ case "REQUIRED":
+ return convertRequiredField(schema.getType(), fieldSchema, v);
+ case "REPEATED":
+ return convertRepeatedField(schema, fieldSchema, v);
+ case "NULLABLE":
+ return convertNullableField(schema, fieldSchema, v);
+ default:
+ throw new UnsupportedOperationException(
+ "Parsing a field with BigQuery field schema mode " + fieldSchema.getMode());
+ }
+ }
+
+ private static List<Object> convertRepeatedField(
+ Schema schema, TableFieldSchema fieldSchema, Object v) {
+ Type arrayType = schema.getType();
+ verify(
+ arrayType == Type.ARRAY,
+ "BigQuery REPEATED field %s should be Avro ARRAY, not %s",
+ fieldSchema.getName(),
+ arrayType);
+ // REPEATED fields are represented as Avro arrays.
+ if (v == null) {
+ // Handle the case of an empty repeated field.
+ return ImmutableList.of();
+ }
+ @SuppressWarnings("unchecked")
+ List<Object> elements = (List<Object>) v;
+ ImmutableList.Builder<Object> values = ImmutableList.builder();
+ Type elementType = schema.getElementType().getType();
+ for (Object element : elements) {
+ values.add(convertRequiredField(elementType, fieldSchema, element));
+ }
+ return values.build();
+ }
+
+ private static Object convertRequiredField(
+ Type avroType, TableFieldSchema fieldSchema, Object v) {
+ // REQUIRED fields are represented as the corresponding Avro types. For example, a BigQuery
+ // INTEGER type maps to an Avro LONG type.
+ checkNotNull(v, "REQUIRED field %s should not be null", fieldSchema.getName());
+ ImmutableMap<String, Type> fieldMap =
+ ImmutableMap.<String, Type>builder()
+ .put("STRING", Type.STRING)
+ .put("INTEGER", Type.LONG)
+ .put("FLOAT", Type.DOUBLE)
+ .put("BOOLEAN", Type.BOOLEAN)
+ .put("TIMESTAMP", Type.LONG)
+ .put("RECORD", Type.RECORD)
+ .build();
+ // Per https://cloud.google.com/bigquery/docs/reference/v2/tables#schema, the type field
+ // is required, so it may not be null.
+ String bqType = fieldSchema.getType();
+ Type expectedAvroType = fieldMap.get(bqType);
+ verify(
+ avroType == expectedAvroType,
+ "Expected Avro schema type %s, not %s, for BigQuery %s field %s",
+ expectedAvroType,
+ avroType,
+ bqType,
+ fieldSchema.getName());
+ switch (fieldSchema.getType()) {
+ case "STRING":
+ // Avro will use a CharSequence to represent String objects, but it may not always use
+ // java.lang.String; for example, it may prefer org.apache.avro.util.Utf8.
+ verify(v instanceof CharSequence, "Expected CharSequence (String), got %s", v.getClass());
+ return v.toString();
+ case "INTEGER":
+ verify(v instanceof Long, "Expected Long, got %s", v.getClass());
+ return ((Long) v).toString();
+ case "FLOAT":
+ verify(v instanceof Double, "Expected Double, got %s", v.getClass());
+ return v;
+ case "BOOLEAN":
+ verify(v instanceof Boolean, "Expected Boolean, got %s", v.getClass());
+ return v;
+ case "TIMESTAMP":
+ // TIMESTAMP data types are represented as Avro LONG types. They are converted back to
+ // Strings with variable-precision (up to six digits) to match the JSON files export
+ // by BigQuery.
+ verify(v instanceof Long, "Expected Long, got %s", v.getClass());
+ Double doubleValue = ((Long) v) / 1000000.0;
+ return formatTimestamp(doubleValue.toString());
+ case "RECORD":
+ verify(v instanceof GenericRecord, "Expected GenericRecord, got %s", v.getClass());
+ return convertGenericRecordToTableRow((GenericRecord) v, fieldSchema.getFields());
+ default:
+ throw new UnsupportedOperationException(
+ String.format(
+ "Unexpected BigQuery field schema type %s for field named %s",
+ fieldSchema.getType(),
+ fieldSchema.getName()));
+ }
+ }
+
+ @Nullable
+ private static Object convertNullableField(
+ Schema avroSchema, TableFieldSchema fieldSchema, Object v) {
+ // NULLABLE fields are represented as an Avro Union of the corresponding type and "null".
+ verify(
+ avroSchema.getType() == Type.UNION,
+ "Expected Avro schema type UNION, not %s, for BigQuery NULLABLE field %s",
+ avroSchema.getType(),
+ fieldSchema.getName());
+ List<Schema> unionTypes = avroSchema.getTypes();
+ verify(
+ unionTypes.size() == 2,
+ "BigQuery NULLABLE field %s should be an Avro UNION of NULL and another type, not %s",
+ fieldSchema.getName(),
+ unionTypes);
+
+ if (v == null) {
+ return null;
+ }
+
+ Type firstType = unionTypes.get(0).getType();
+ if (!firstType.equals(Type.NULL)) {
+ return convertRequiredField(firstType, fieldSchema, v);
+ }
+ return convertRequiredField(unionTypes.get(1).getType(), fieldSchema, v);
+ }
+}