You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:02:58 UTC
[03/10] incubator-beam git commit: BigQueryIO: move to
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
new file mode 100644
index 0000000..3e77362
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.options.BigQueryOptions;
+
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+
+/**
+ * An interface for real, mock, or fake implementations of Cloud BigQuery services.
+ */
+interface BigQueryServices extends Serializable {
+
+ /**
+ * Returns a real, mock, or fake {@link JobService}.
+ */
+ JobService getJobService(BigQueryOptions bqOptions);
+
+ /**
+ * Returns a real, mock, or fake {@link DatasetService}.
+ */
+ DatasetService getDatasetService(BigQueryOptions bqOptions);
+
+ /**
+ * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables.
+ */
+ BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef);
+
+ /**
+ * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
+ */
+ BigQueryJsonReader getReaderFromQuery(
+ BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten);
+
+ /**
+ * An interface for the Cloud BigQuery load service.
+ */
+ interface JobService {
+ /**
+ * Start a BigQuery load job.
+ */
+ void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
+ throws InterruptedException, IOException;
+ /**
+ * Start a BigQuery extract job.
+ */
+ void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
+ throws InterruptedException, IOException;
+
+ /**
+ * Start a BigQuery query job.
+ */
+ void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
+ throws IOException, InterruptedException;
+
+ /**
+ * Waits for the job is Done, and returns the job.
+ *
+ * <p>Returns null if the {@code maxAttempts} retries reached.
+ */
+ Job pollJob(JobReference jobRef, int maxAttempts)
+ throws InterruptedException, IOException;
+
+ /**
+ * Dry runs the query in the given project.
+ */
+ JobStatistics dryRunQuery(String projectId, String query)
+ throws InterruptedException, IOException;
+ }
+
+ /**
+ * An interface to get, create and delete Cloud BigQuery datasets and tables.
+ */
+ interface DatasetService {
+ /**
+ * Gets the specified {@link Table} resource by table ID.
+ */
+ Table getTable(String projectId, String datasetId, String tableId)
+ throws InterruptedException, IOException;
+
+ /**
+ * Deletes the table specified by tableId from the dataset.
+ * If the table contains data, all the data will be deleted.
+ */
+ void deleteTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException;
+
+ /**
+ * Returns true if the table is empty.
+ */
+ boolean isTableEmpty(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException;
+
+ /**
+ * Gets the specified {@link Dataset} resource by dataset ID.
+ */
+ Dataset getDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException;
+
+ /**
+ * Create a {@link Dataset} with the given {@code location} and {@code description}.
+ */
+ void createDataset(String projectId, String datasetId, String location, String description)
+ throws IOException, InterruptedException;
+
+ /**
+ * Deletes the dataset specified by the datasetId value.
+ *
+ * <p>Before you can delete a dataset, you must delete all its tables.
+ */
+ void deleteDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException;
+ }
+
+ /**
+ * An interface to read the Cloud BigQuery directly.
+ */
+ interface BigQueryJsonReader {
+ /**
+ * Initializes the reader and advances the reader to the first record.
+ */
+ boolean start() throws IOException;
+
+ /**
+ * Advances the reader to the next valid record.
+ */
+ boolean advance() throws IOException;
+
+ /**
+ * Returns the value of the data item that was read by the last {@link #start} or
+ * {@link #advance} call. The returned value must be effectively immutable and remain valid
+ * indefinitely.
+ *
+ * <p>Multiple calls to this method without an intervening call to {@link #advance} should
+ * return the same result.
+ *
+ * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
+ * the last {@link #start} or {@link #advance} returned {@code false}.
+ */
+ TableRow getCurrent() throws NoSuchElementException;
+
+ /**
+ * Closes the reader. The reader cannot be used after this method is called.
+ */
+ void close() throws IOException;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
new file mode 100644
index 0000000..414baae
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -0,0 +1,511 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.Transport;
+
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery
+ * service.
+ */
+class BigQueryServicesImpl implements BigQueryServices {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
+
+ // The maximum number of attempts to execute a BigQuery RPC.
+ private static final int MAX_RPC_ATTEMPTS = 10;
+
+ // The initial backoff for executing a BigQuery RPC.
+ private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+ // The initial backoff for polling the status of a BigQuery job.
+ private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+ @Override
+ public JobService getJobService(BigQueryOptions options) {
+ return new JobServiceImpl(options);
+ }
+
+ @Override
+ public DatasetService getDatasetService(BigQueryOptions options) {
+ return new DatasetServiceImpl(options);
+ }
+
+ @Override
+ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
+ return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef);
+ }
+
+ @Override
+ public BigQueryJsonReader getReaderFromQuery(
+ BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
+ return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten);
+ }
+
+ @VisibleForTesting
+ static class JobServiceImpl implements BigQueryServices.JobService {
+ private final ApiErrorExtractor errorExtractor;
+ private final Bigquery client;
+
+ @VisibleForTesting
+ JobServiceImpl(Bigquery client) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = client;
+ }
+
+ private JobServiceImpl(BigQueryOptions options) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = Transport.newBigQueryClient(options).build();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void startLoadJob(
+ JobReference jobRef,
+ JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(new JobConfiguration().setLoad(loadConfig));
+
+ startJob(job, errorExtractor, client);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
+ throws InterruptedException, IOException {
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(
+ new JobConfiguration().setExtract(extractConfig));
+
+ startJob(job, errorExtractor, client);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
+ throws IOException, InterruptedException {
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(
+ new JobConfiguration().setQuery(queryConfig));
+
+ startJob(job, errorExtractor, client);
+ }
+
+ private static void startJob(Job job,
+ ApiErrorExtractor errorExtractor,
+ Bigquery client) throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ startJob(job, errorExtractor, client, Sleeper.DEFAULT, backoff);
+ }
+
+ @VisibleForTesting
+ static void startJob(
+ Job job,
+ ApiErrorExtractor errorExtractor,
+ Bigquery client,
+ Sleeper sleeper,
+ BackOff backoff) throws IOException, InterruptedException {
+ JobReference jobRef = job.getJobReference();
+ Exception lastException = null;
+ do {
+ try {
+ client.jobs().insert(jobRef.getProjectId(), job).execute();
+ return; // SUCCEEDED
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.itemAlreadyExists(e)) {
+ return; // SUCCEEDED
+ }
+ // ignore and retry
+ LOG.warn("Ignore the error and retry inserting the job.", e);
+ lastException = e;
+ } catch (IOException e) {
+ // ignore and retry
+ LOG.warn("Ignore the error and retry inserting the job.", e);
+ lastException = e;
+ }
+ } while (nextBackOff(sleeper, backoff));
+ throw new IOException(
+ String.format(
+ "Unable to insert job: %s, aborting after %d .",
+ jobRef.getJobId(), MAX_RPC_ATTEMPTS),
+ lastException);
+ }
+
+ @Override
+ public Job pollJob(JobReference jobRef, int maxAttempts)
+ throws InterruptedException {
+ BackOff backoff = new AttemptBoundedExponentialBackOff(
+ maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
+ return pollJob(jobRef, Sleeper.DEFAULT, backoff);
+ }
+
+ @VisibleForTesting
+ Job pollJob(
+ JobReference jobRef,
+ Sleeper sleeper,
+ BackOff backoff) throws InterruptedException {
+ do {
+ try {
+ Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
+ JobStatus status = job.getStatus();
+ if (status != null && status.getState() != null && status.getState().equals("DONE")) {
+ return job;
+ }
+ // The job is not DONE, wait longer and retry.
+ } catch (IOException e) {
+ // ignore and retry
+ LOG.warn("Ignore the error and retry polling job status.", e);
+ }
+ } while (nextBackOff(sleeper, backoff));
+ LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());
+ return null;
+ }
+
+ @Override
+ public JobStatistics dryRunQuery(String projectId, String query)
+ throws InterruptedException, IOException {
+ Job job = new Job()
+ .setConfiguration(new JobConfiguration()
+ .setQuery(new JobConfigurationQuery()
+ .setQuery(query))
+ .setDryRun(true));
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ return executeWithRetries(
+ client.jobs().insert(projectId, job),
+ String.format(
+ "Unable to dry run query: %s, aborting after %d retries.",
+ query, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff).getStatistics();
+ }
+ }
+
+ @VisibleForTesting
+ static class DatasetServiceImpl implements DatasetService {
+ private final ApiErrorExtractor errorExtractor;
+ private final Bigquery client;
+
+ private DatasetServiceImpl(BigQueryOptions bqOptions) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = Transport.newBigQueryClient(bqOptions).build();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public Table getTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ return executeWithRetries(
+ client.tables().get(projectId, datasetId, tableId),
+ String.format(
+ "Unable to get table: %s, aborting after %d retries.",
+ tableId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void deleteTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ executeWithRetries(
+ client.tables().delete(projectId, datasetId, tableId),
+ String.format(
+ "Unable to delete table: %s, aborting after %d retries.",
+ tableId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+
+ @Override
+ public boolean isTableEmpty(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ TableDataList dataList = executeWithRetries(
+ client.tabledata().list(projectId, datasetId, tableId),
+ String.format(
+ "Unable to list table data: %s, aborting after %d retries.",
+ tableId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ return dataList.getRows() == null || dataList.getRows().isEmpty();
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public Dataset getDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ return executeWithRetries(
+ client.datasets().get(projectId, datasetId),
+ String.format(
+ "Unable to get dataset: %s, aborting after %d retries.",
+ datasetId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void createDataset(
+ String projectId, String datasetId, String location, String description)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
+ }
+
+ @VisibleForTesting
+ void createDataset(
+ String projectId,
+ String datasetId,
+ String location,
+ String description,
+ Sleeper sleeper,
+ BackOff backoff) throws IOException, InterruptedException {
+ DatasetReference datasetRef = new DatasetReference()
+ .setProjectId(projectId)
+ .setDatasetId(datasetId);
+
+ Dataset dataset = new Dataset()
+ .setDatasetReference(datasetRef)
+ .setLocation(location)
+ .setFriendlyName(location)
+ .setDescription(description);
+
+ Exception lastException;
+ do {
+ try {
+ client.datasets().insert(projectId, dataset).execute();
+ return; // SUCCEEDED
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.itemAlreadyExists(e)) {
+ return; // SUCCEEDED
+ }
+ // ignore and retry
+ LOG.warn("Ignore the error and retry creating the dataset.", e);
+ lastException = e;
+ } catch (IOException e) {
+ LOG.warn("Ignore the error and retry creating the dataset.", e);
+ lastException = e;
+ }
+ } while (nextBackOff(sleeper, backoff));
+ throw new IOException(
+ String.format(
+ "Unable to create dataset: %s, aborting after %d .",
+ datasetId, MAX_RPC_ATTEMPTS),
+ lastException);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void deleteDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ executeWithRetries(
+ client.datasets().delete(projectId, datasetId),
+ String.format(
+ "Unable to delete table: %s, aborting after %d retries.",
+ datasetId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+ }
+
+ private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
+ BigQueryTableRowIterator iterator;
+
+ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
+ this.iterator = iterator;
+ }
+
+ private static BigQueryJsonReader fromQuery(
+ BigQueryOptions bqOptions,
+ String query,
+ String projectId,
+ @Nullable Boolean flattenResults) {
+ return new BigQueryJsonReaderImpl(
+ BigQueryTableRowIterator.fromQuery(
+ query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults));
+ }
+
+ private static BigQueryJsonReader fromTable(
+ BigQueryOptions bqOptions,
+ TableReference tableRef) {
+ return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
+ tableRef, Transport.newBigQueryClient(bqOptions).build()));
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ try {
+ iterator.open();
+ return iterator.advance();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during start() operation", e);
+ }
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ return iterator.advance();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during advance() operation", e);
+ }
+ }
+
+ @Override
+ public TableRow getCurrent() throws NoSuchElementException {
+ return iterator.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ iterator.close();
+ }
+ }
+
+ @VisibleForTesting
+ static <T> T executeWithRetries(
+ AbstractGoogleClientRequest<T> request,
+ String errorMessage,
+ Sleeper sleeper,
+ BackOff backoff)
+ throws IOException, InterruptedException {
+ Exception lastException = null;
+ do {
+ try {
+ return request.execute();
+ } catch (IOException e) {
+ LOG.warn("Ignore the error and retry the request.", e);
+ lastException = e;
+ }
+ } while (nextBackOff(sleeper, backoff));
+ throw new IOException(
+ errorMessage,
+ lastException);
+ }
+
+ /**
+ * Identical to {@link BackOffUtils#next} but without checked IOException.
+ * @throws InterruptedException
+ */
+ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
+ try {
+ return BackOffUtils.next(sleeper, backoff);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
new file mode 100644
index 0000000..00a4fa3
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableInserter.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
+
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ExponentialBackOff;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
+import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.annotations.VisibleForTesting;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * Inserts rows into BigQuery.
+ */
+class BigQueryTableInserter {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableInserter.class);
+
+ // Approximate amount of table data to upload per InsertAll request.
+ private static final long UPLOAD_BATCH_SIZE_BYTES = 64 * 1024;
+
+ // The maximum number of rows to upload per InsertAll request.
+ private static final long MAX_ROWS_PER_BATCH = 500;
+
+ // The maximum number of times to retry inserting rows into BigQuery.
+ private static final int MAX_INSERT_ATTEMPTS = 5;
+
+ // The initial backoff after a failure inserting rows into BigQuery.
+ private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;
+
+ // Backoff time bounds for rate limit exceeded errors.
+ private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
+ private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);
+
+ private final Bigquery client;
+ private final long maxRowsPerBatch;
+
+ private ExecutorService executor;
+
+ /**
+ * Constructs a new row inserter.
+ *
+ * @param client a BigQuery client
+ * @param options a PipelineOptions object
+ */
+ BigQueryTableInserter(Bigquery client, PipelineOptions options) {
+ this.client = client;
+ this.maxRowsPerBatch = MAX_ROWS_PER_BATCH;
+ this.executor = options.as(GcsOptions.class).getExecutorService();
+ }
+
+ /**
+ * Constructs a new row inserter.
+ *
+ * @param client a BigQuery client
+ * @param options a PipelineOptions object
+ * @param maxRowsPerBatch maximum number of rows to insert per call to BigQuery
+ */
+ BigQueryTableInserter(Bigquery client, PipelineOptions options,
+ int maxRowsPerBatch) {
+ this.client = client;
+ this.maxRowsPerBatch = maxRowsPerBatch;
+ this.executor = options.as(GcsOptions.class).getExecutorService();
+ }
+
+ /**
+ * Insert all rows from the given list.
+ */
+ void insertAll(TableReference ref, List<TableRow> rowList) throws IOException {
+ insertAll(ref, rowList, null, null);
+ }
+
+ /**
+ * Insert all rows from the given list using specified insertIds if not null. Track count of
+ * bytes written with the Aggregator.
+ */
+ void insertAll(TableReference ref, List<TableRow> rowList,
+ @Nullable List<String> insertIdList, @Nullable Aggregator<Long, Long> byteCountAggregator)
+ throws IOException {
+ checkNotNull(ref, "ref");
+ if (insertIdList != null && rowList.size() != insertIdList.size()) {
+ throw new AssertionError("If insertIdList is not null it needs to have at least "
+ + "as many elements as rowList");
+ }
+
+ AttemptBoundedExponentialBackOff backoff = new AttemptBoundedExponentialBackOff(
+ MAX_INSERT_ATTEMPTS,
+ INITIAL_INSERT_BACKOFF_INTERVAL_MS);
+
+ List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>();
+ // These lists contain the rows to publish. Initially the contain the entire list. If there are
+ // failures, they will contain only the failed rows to be retried.
+ List<TableRow> rowsToPublish = rowList;
+ List<String> idsToPublish = insertIdList;
+ while (true) {
+ List<TableRow> retryRows = new ArrayList<>();
+ List<String> retryIds = (idsToPublish != null) ? new ArrayList<String>() : null;
+
+ int strideIndex = 0;
+ // Upload in batches.
+ List<TableDataInsertAllRequest.Rows> rows = new LinkedList<>();
+ int dataSize = 0;
+
+ List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>();
+ List<Integer> strideIndices = new ArrayList<>();
+
+ for (int i = 0; i < rowsToPublish.size(); ++i) {
+ TableRow row = rowsToPublish.get(i);
+ TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows();
+ if (idsToPublish != null) {
+ out.setInsertId(idsToPublish.get(i));
+ }
+ out.setJson(row.getUnknownKeys());
+ rows.add(out);
+
+ dataSize += row.toString().length();
+ if (dataSize >= UPLOAD_BATCH_SIZE_BYTES || rows.size() >= maxRowsPerBatch
+ || i == rowsToPublish.size() - 1) {
+ TableDataInsertAllRequest content = new TableDataInsertAllRequest();
+ content.setRows(rows);
+
+ final Bigquery.Tabledata.InsertAll insert = client.tabledata()
+ .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(),
+ content);
+
+ futures.add(
+ executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
+ @Override
+ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
+ BackOff backoff = new IntervalBoundedExponentialBackOff(
+ MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
+ while (true) {
+ try {
+ return insert.execute().getInsertErrors();
+ } catch (IOException e) {
+ if (new ApiErrorExtractor().rateLimited(e)) {
+ LOG.info("BigQuery insertAll exceeded rate limit, retrying");
+ try {
+ Thread.sleep(backoff.nextBackOffMillis());
+ } catch (InterruptedException interrupted) {
+ throw new IOException(
+ "Interrupted while waiting before retrying insertAll");
+ }
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ }));
+ strideIndices.add(strideIndex);
+
+ if (byteCountAggregator != null) {
+ byteCountAggregator.addValue((long) dataSize);
+ }
+ dataSize = 0;
+ strideIndex = i + 1;
+ rows = new LinkedList<>();
+ }
+ }
+
+ try {
+ for (int i = 0; i < futures.size(); i++) {
+ List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get();
+ if (errors != null) {
+ for (TableDataInsertAllResponse.InsertErrors error : errors) {
+ allErrors.add(error);
+ if (error.getIndex() == null) {
+ throw new IOException("Insert failed: " + allErrors);
+ }
+
+ int errorIndex = error.getIndex().intValue() + strideIndices.get(i);
+ retryRows.add(rowsToPublish.get(errorIndex));
+ if (retryIds != null) {
+ retryIds.add(idsToPublish.get(errorIndex));
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while inserting " + rowsToPublish);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
+
+ if (!allErrors.isEmpty() && !backoff.atMaxAttempts()) {
+ try {
+ Thread.sleep(backoff.nextBackOffMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while waiting before retrying insert of " + retryRows);
+ }
+ LOG.info("Retrying failed inserts to BigQuery");
+ rowsToPublish = retryRows;
+ idsToPublish = retryIds;
+ allErrors.clear();
+ } else {
+ break;
+ }
+ }
+ if (!allErrors.isEmpty()) {
+ throw new IOException("Insert failed: " + allErrors);
+ }
+ }
+
+ /**
+ * Retrieves or creates the table.
+ *
+ * <p>The table is checked to conform to insertion requirements as specified
+ * by WriteDisposition and CreateDisposition.
+ *
+ * <p>If table truncation is requested (WriteDisposition.WRITE_TRUNCATE), then
+ * this will re-create the table if necessary to ensure it is empty.
+ *
+ * <p>If an empty table is required (WriteDisposition.WRITE_EMPTY), then this
+ * will fail if the table exists and is not empty.
+ *
+ * <p>When constructing a table, a {@code TableSchema} must be available. If a
+ * schema is provided, then it will be used. If no schema is provided, but
+ * an existing table is being cleared (WRITE_TRUNCATE option above), then
+ * the existing schema will be re-used. If no schema is available, then an
+ * {@code IOException} is thrown.
+ */
+ Table getOrCreateTable(
+ TableReference ref,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition,
+ @Nullable TableSchema schema) throws IOException {
+ // Check if table already exists.
+ Bigquery.Tables.Get get = client.tables()
+ .get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+ Table table = null;
+ try {
+ table = get.execute();
+ } catch (IOException e) {
+ ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+ if (!errorExtractor.itemNotFound(e)
+ || createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
+ // Rethrow.
+ throw e;
+ }
+ }
+
+ // If we want an empty table, and it isn't, then delete it first.
+ if (table != null) {
+ if (writeDisposition == WriteDisposition.WRITE_APPEND) {
+ return table;
+ }
+
+ boolean empty = isEmpty(ref);
+ if (empty) {
+ if (writeDisposition == WriteDisposition.WRITE_TRUNCATE) {
+ LOG.info("Empty table found, not removing {}", BigQueryIO.toTableSpec(ref));
+ }
+ return table;
+
+ } else if (writeDisposition == WriteDisposition.WRITE_EMPTY) {
+ throw new IOException("WriteDisposition is WRITE_EMPTY, "
+ + "but table is not empty");
+ }
+
+ // Reuse the existing schema if none was provided.
+ if (schema == null) {
+ schema = table.getSchema();
+ }
+
+ // Delete table and fall through to re-creating it below.
+ LOG.info("Deleting table {}", BigQueryIO.toTableSpec(ref));
+ Bigquery.Tables.Delete delete = client.tables()
+ .delete(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+ delete.execute();
+ }
+
+ if (schema == null) {
+ throw new IllegalArgumentException(
+ "Table schema required for new table.");
+ }
+
+ // Create the table.
+ return tryCreateTable(ref, schema);
+ }
+
+ /**
+ * Checks if a table is empty.
+ */
+ private boolean isEmpty(TableReference ref) throws IOException {
+ Bigquery.Tabledata.List list = client.tabledata()
+ .list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+ list.setMaxResults(1L);
+ TableDataList dataList = list.execute();
+
+ return dataList.getRows() == null || dataList.getRows().isEmpty();
+ }
+
+ /**
+ * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the
+ * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is
+ * configured with a table spec function to use different tables for each window.
+ */
+ private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = (int) TimeUnit.MINUTES.toMillis(5);
+
+ /**
+ * Tries to create the BigQuery table.
+ * If a table with the same name already exists in the dataset, the table
+ * creation fails, and the function returns null. In such a case,
+ * the existing table doesn't necessarily have the same schema as specified
+ * by the parameter.
+ *
+ * @param schema Schema of the new BigQuery table.
+ * @return The newly created BigQuery table information, or null if the table
+ * with the same name already exists.
+ * @throws IOException if other error than already existing table occurs.
+ */
+ @Nullable
+ private Table tryCreateTable(TableReference ref, TableSchema schema) throws IOException {
+ LOG.info("Trying to create BigQuery table: {}", BigQueryIO.toTableSpec(ref));
+ BackOff backoff =
+ new ExponentialBackOff.Builder()
+ .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
+ .build();
+
+ Table table = new Table().setTableReference(ref).setSchema(schema);
+ return tryCreateTable(table, ref.getProjectId(), ref.getDatasetId(), backoff, Sleeper.DEFAULT);
+ }
+
+ @VisibleForTesting
+ @Nullable
+ Table tryCreateTable(
+ Table table, String projectId, String datasetId, BackOff backoff, Sleeper sleeper)
+ throws IOException {
+ boolean retry = false;
+ while (true) {
+ try {
+ return client.tables().insert(projectId, datasetId, table).execute();
+ } catch (IOException e) {
+ ApiErrorExtractor extractor = new ApiErrorExtractor();
+ if (extractor.itemAlreadyExists(e)) {
+ // The table already exists, nothing to return.
+ return null;
+ } else if (extractor.rateLimited(e)) {
+ // The request failed because we hit a temporary quota. Back off and try again.
+ try {
+ if (BackOffUtils.next(sleeper, backoff)) {
+ if (!retry) {
+ LOG.info(
+ "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes",
+ projectId,
+ datasetId,
+ table.getTableReference().getTableId(),
+ TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0);
+ retry = true;
+ }
+ continue;
+ }
+ } catch (InterruptedException e1) {
+ // Restore interrupted state and throw the last failure.
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+ throw e;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
new file mode 100644
index 0000000..3afdffa
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableRowIterator.java
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.ClassInfo;
+import com.google.api.client.util.Data;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.Bigquery.Jobs.Insert;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
+import com.google.api.services.bigquery.model.ErrorProto;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfiguration;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableCell;
+import com.google.api.services.bigquery.model.TableDataList;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.Nullable;
+
+/**
+ * Iterates over all rows in a table.
+ */
+class BigQueryTableRowIterator implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class);
+
+ @Nullable private TableReference ref;
+ @Nullable private final String projectId;
+ @Nullable private TableSchema schema;
+ private final Bigquery client;
+ private String pageToken;
+ private Iterator<TableRow> iteratorOverCurrentBatch;
+ private TableRow current;
+ // Set true when the final page is seen from the service.
+ private boolean lastPage = false;
+
+ // The maximum number of times a BigQuery request will be retried
+ private static final int MAX_RETRIES = 3;
+ // Initial wait time for the backoff implementation
+ private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1);
+
+ // After sending a query to BQ service we will be polling the BQ service to check the status with
+ // following interval to check the status of query execution job
+ private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);
+
+ private final String query;
+ // Whether to flatten query results.
+ private final boolean flattenResults;
+ // Temporary dataset used to store query results.
+ private String temporaryDatasetId = null;
+ // Temporary table used to store query results.
+ private String temporaryTableId = null;
+
+ private BigQueryTableRowIterator(
+ @Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
+ Bigquery client, boolean flattenResults) {
+ this.ref = ref;
+ this.query = query;
+ this.projectId = projectId;
+ this.client = checkNotNull(client, "client");
+ this.flattenResults = flattenResults;
+ }
+
+ /**
+ * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table.
+ */
+ public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
+ checkNotNull(ref, "ref");
+ checkNotNull(client, "client");
+ return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
+ }
+
+ /**
+ * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
+ * specified query in the specified project.
+ */
+ public static BigQueryTableRowIterator fromQuery(
+ String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
+ checkNotNull(query, "query");
+ checkNotNull(projectId, "projectId");
+ checkNotNull(client, "client");
+ return new BigQueryTableRowIterator(null, query, projectId, client,
+ MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
+ }
+
+ /**
+ * Opens the table for read.
+ * @throws IOException on failure
+ */
+ public void open() throws IOException, InterruptedException {
+ if (query != null) {
+ ref = executeQueryAndWaitForCompletion();
+ }
+ // Get table schema.
+ Bigquery.Tables.Get get =
+ client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+
+ Table table =
+ executeWithBackOff(
+ get,
+ "Error opening BigQuery table %s of dataset %s : {}",
+ ref.getTableId(),
+ ref.getDatasetId());
+ schema = table.getSchema();
+ }
+
+ public boolean advance() throws IOException, InterruptedException {
+ while (true) {
+ if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) {
+ // Embed schema information into the raw row, so that values have an
+ // associated key. This matches how rows are read when using the
+ // DataflowRunner.
+ current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next());
+ return true;
+ }
+ if (lastPage) {
+ return false;
+ }
+
+ Bigquery.Tabledata.List list =
+ client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId());
+ if (pageToken != null) {
+ list.setPageToken(pageToken);
+ }
+
+ TableDataList result =
+ executeWithBackOff(
+ list,
+ "Error reading from BigQuery table %s of dataset %s : {}",
+ ref.getTableId(),
+ ref.getDatasetId());
+
+ pageToken = result.getPageToken();
+ iteratorOverCurrentBatch =
+ result.getRows() != null
+ ? result.getRows().iterator()
+ : Collections.<TableRow>emptyIterator();
+
+ // The server may return a page token indefinitely on a zero-length table.
+ if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) {
+ lastPage = true;
+ }
+ }
+ }
+
+ public TableRow getCurrent() {
+ if (current == null) {
+ throw new NoSuchElementException();
+ }
+ return current;
+ }
+
+ /**
+ * Adjusts a field returned from the BigQuery API to match what we will receive when running
+ * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation
+ * used for batch jobs executed on the Cloud Dataflow service.
+ *
+ * <p>The following is the relationship between BigQuery schema and Java types:
+ *
+ * <ul>
+ * <li>Nulls are {@code null}.
+ * <li>Repeated fields are {@code List} of objects.
+ * <li>Record columns are {@link TableRow} objects.
+ * <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects.
+ * <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects.
+ * <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format
+ * {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing
+ * zeros and can be 1 to 6 digits long.
+ * <li>Every other atomic type is a {@code String}.
+ * </ul>
+ *
+ * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format.
+ *
+ * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs
+ * and are not accessible through the {@link TableRow#getF} function.
+ */
+ @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
+ if (Data.isNull(v)) {
+ return null;
+ }
+
+ if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
+ TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
+ @SuppressWarnings("unchecked")
+ List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
+ ImmutableList.Builder<Object> values = ImmutableList.builder();
+ for (Map<String, Object> element : rawCells) {
+ values.add(getTypedCellValue(elementSchema, element.get("v")));
+ }
+ return values.build();
+ }
+
+ if (fieldSchema.getType().equals("RECORD")) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> typedV = (Map<String, Object>) v;
+ return getTypedTableRow(fieldSchema.getFields(), typedV);
+ }
+
+ if (fieldSchema.getType().equals("FLOAT")) {
+ return Double.parseDouble((String) v);
+ }
+
+ if (fieldSchema.getType().equals("BOOLEAN")) {
+ return Boolean.parseBoolean((String) v);
+ }
+
+ if (fieldSchema.getType().equals("TIMESTAMP")) {
+ return BigQueryAvroUtils.formatTimestamp((String) v);
+ }
+
+ return v;
+ }
+
+ /**
+ * A list of the field names that cannot be used in BigQuery tables processed by Dataflow,
+ * because they are reserved keywords in {@link TableRow}.
+ */
+ // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does
+ // not indirect through our broken use of {@link TableRow}.
+ // See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41
+ private static final Collection<String> RESERVED_FIELD_NAMES =
+ ClassInfo.of(TableRow.class).getNames();
+
+ /**
+ * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a
+ * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in
+ * the cells are converted to Java types according to the provided field schemas.
+ *
+ * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery
+ * types are mapped to Java types.
+ */
+ private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
+ // If rawRow is a TableRow, use it. If not, create a new one.
+ TableRow row;
+ List<? extends Map<String, Object>> cells;
+ if (rawRow instanceof TableRow) {
+ // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
+ // any type conversion, but extract the cells for cell-wise processing below.
+ row = (TableRow) rawRow;
+ cells = row.getF();
+ // Clear the cells from the row, so that row.getF() will return null. This matches the
+ // behavior of rows produced by the BigQuery export API used on the service.
+ row.setF(null);
+ } else {
+ row = new TableRow();
+
+ // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
+ // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
+ // we will use Map.get("v") instead of TableCell.getV() get its value.
+ @SuppressWarnings("unchecked")
+ List<? extends Map<String, Object>> rawCells =
+ (List<? extends Map<String, Object>>) rawRow.get("f");
+ cells = rawCells;
+ }
+
+ checkState(cells.size() == fields.size(),
+ "Expected that the row has the same number of cells %s as fields in the schema %s",
+ cells.size(), fields.size());
+
+ // Loop through all the fields in the row, normalizing their types with the TableFieldSchema
+ // and storing the normalized values by field name in the Map<String, Object> that
+ // underlies the TableRow.
+ Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
+ Iterator<TableFieldSchema> fieldIt = fields.iterator();
+ while (cellIt.hasNext()) {
+ Map<String, Object> cell = cellIt.next();
+ TableFieldSchema fieldSchema = fieldIt.next();
+
+ // Convert the object in this cell to the Java type corresponding to its type in the schema.
+ Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
+
+ String fieldName = fieldSchema.getName();
+ checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName),
+ "BigQueryIO does not support records with columns named %s", fieldName);
+
+ if (convertedValue == null) {
+ // BigQuery does not include null values when the export operation (to JSON) is used.
+ // To match that behavior, BigQueryTableRowiterator, and the DirectRunner,
+ // intentionally omits columns with null values.
+ continue;
+ }
+
+ row.set(fieldName, convertedValue);
+ }
+ return row;
+ }
+
+ // Create a new BigQuery dataset
+ private void createDataset(String datasetId) throws IOException, InterruptedException {
+ Dataset dataset = new Dataset();
+ DatasetReference reference = new DatasetReference();
+ reference.setProjectId(projectId);
+ reference.setDatasetId(datasetId);
+ dataset.setDatasetReference(reference);
+
+ String createDatasetError =
+ "Error when trying to create the temporary dataset " + datasetId + " in project "
+ + projectId;
+ executeWithBackOff(
+ client.datasets().insert(projectId, dataset), createDatasetError + " :{}");
+ }
+
+ // Delete the given table that is available in the given dataset.
+ private void deleteTable(String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ executeWithBackOff(
+ client.tables().delete(projectId, datasetId, tableId),
+ "Error when trying to delete the temporary table " + datasetId + " in dataset " + datasetId
+ + " of project " + projectId + ". Manual deletion may be required. Error message : {}");
+ }
+
+ // Delete the given dataset. This will fail if the given dataset has any tables.
+ private void deleteDataset(String datasetId) throws IOException, InterruptedException {
+ executeWithBackOff(
+ client.datasets().delete(projectId, datasetId),
+ "Error when trying to delete the temporary dataset " + datasetId + " in project "
+ + projectId + ". Manual deletion may be required. Error message : {}");
+ }
+
+ /**
+ * Executes the specified query and returns a reference to the temporary BigQuery table created
+ * to hold the results.
+ *
+ * @throws IOException if the query fails.
+ */
+ private TableReference executeQueryAndWaitForCompletion()
+ throws IOException, InterruptedException {
+ // Create a temporary dataset to store results.
+ // Starting dataset name with an "_" so that it is hidden.
+ Random rnd = new Random(System.currentTimeMillis());
+ temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
+ temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
+
+ createDataset(temporaryDatasetId);
+ Job job = new Job();
+ JobConfiguration config = new JobConfiguration();
+ JobConfigurationQuery queryConfig = new JobConfigurationQuery();
+ config.setQuery(queryConfig);
+ job.setConfiguration(config);
+ queryConfig.setQuery(query);
+ queryConfig.setAllowLargeResults(true);
+ queryConfig.setFlattenResults(flattenResults);
+
+ TableReference destinationTable = new TableReference();
+ destinationTable.setProjectId(projectId);
+ destinationTable.setDatasetId(temporaryDatasetId);
+ destinationTable.setTableId(temporaryTableId);
+ queryConfig.setDestinationTable(destinationTable);
+
+ Insert insert = client.jobs().insert(projectId, job);
+ Job queryJob = executeWithBackOff(
+ insert, "Error when trying to execute the job for query " + query + " :{}");
+ JobReference jobId = queryJob.getJobReference();
+
+ while (true) {
+ Job pollJob = executeWithBackOff(
+ client.jobs().get(projectId, jobId.getJobId()),
+ "Error when trying to get status of the job for query " + query + " :{}");
+ JobStatus status = pollJob.getStatus();
+ if (status.getState().equals("DONE")) {
+ // Job is DONE, but did not necessarily succeed.
+ ErrorProto error = status.getErrorResult();
+ if (error == null) {
+ return pollJob.getConfiguration().getQuery().getDestinationTable();
+ } else {
+ // There will be no temporary table to delete, so null out the reference.
+ temporaryTableId = null;
+ throw new IOException("Executing query " + query + " failed: " + error.getMessage());
+ }
+ }
+ Uninterruptibles.sleepUninterruptibly(
+ QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ // Execute a BQ request with exponential backoff and return the result.
+ // client - BQ request to be executed
+ // error - Formatted message to log if when a request fails. Takes exception message as a
+ // formatter parameter.
+ public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error,
+ Object... errorArgs) throws IOException, InterruptedException {
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backOff =
+ new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_TIME.getMillis());
+
+ T result = null;
+ while (true) {
+ try {
+ result = client.execute();
+ break;
+ } catch (IOException e) {
+ LOG.error(String.format(error, errorArgs), e.getMessage());
+ if (!BackOffUtils.next(sleeper, backOff)) {
+ LOG.error(
+ String.format(error, errorArgs), "Failing after retrying " + MAX_RETRIES + " times.");
+ throw e;
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public void close() {
+ // Prevent any further requests.
+ lastPage = true;
+
+ try {
+ // Deleting temporary table and dataset that gets generated when executing a query.
+ if (temporaryDatasetId != null) {
+ if (temporaryTableId != null) {
+ deleteTable(temporaryDatasetId, temporaryTableId);
+ }
+ deleteDataset(temporaryDatasetId);
+ }
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java
new file mode 100644
index 0000000..a6fc693
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Defines transforms for reading and writing from Google BigQuery.
+ *
+ * @see org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
new file mode 100644
index 0000000..316392f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryAvroUtilsTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.Lists;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests for {@link BigQueryAvroUtils}.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryAvroUtilsTest {
+ @Test
+ public void testConvertGenericRecordToTableRow() throws Exception {
+ TableSchema tableSchema = new TableSchema();
+ List<TableFieldSchema> subFields = Lists.<TableFieldSchema>newArrayList(
+ new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"));
+ /*
+ * Note that the quality and quantity fields do not have their mode set, so they should default
+ * to NULLABLE. This is an important test of BigQuery semantics.
+ *
+ * All the other fields we set in this function are required on the Schema response.
+ *
+ * See https://cloud.google.com/bigquery/docs/reference/v2/tables#schema
+ */
+ List<TableFieldSchema> fields =
+ Lists.<TableFieldSchema>newArrayList(
+ new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"),
+ new TableFieldSchema().setName("species").setType("STRING").setMode("NULLABLE"),
+ new TableFieldSchema().setName("quality").setType("FLOAT") /* default to NULLABLE */,
+ new TableFieldSchema().setName("quantity").setType("INTEGER") /* default to NULLABLE */,
+ new TableFieldSchema().setName("birthday").setType("TIMESTAMP").setMode("NULLABLE"),
+ new TableFieldSchema().setName("flighted").setType("BOOLEAN").setMode("NULLABLE"),
+ new TableFieldSchema().setName("scion").setType("RECORD").setMode("NULLABLE")
+ .setFields(subFields),
+ new TableFieldSchema().setName("associates").setType("RECORD").setMode("REPEATED")
+ .setFields(subFields));
+ tableSchema.setFields(fields);
+ Schema avroSchema = AvroCoder.of(Bird.class).getSchema();
+
+ {
+ // Test nullable fields.
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("number", 5L);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow row = new TableRow()
+ .set("number", "5")
+ .set("associates", new ArrayList<TableRow>());
+ assertEquals(row, convertedRow);
+ }
+ {
+ // Test type conversion for TIMESTAMP, INTEGER, BOOLEAN, and FLOAT.
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("number", 5L);
+ record.put("quality", 5.0);
+ record.put("birthday", 5L);
+ record.put("flighted", Boolean.TRUE);
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow row = new TableRow()
+ .set("number", "5")
+ .set("birthday", "1970-01-01 00:00:00.000005 UTC")
+ .set("quality", 5.0)
+ .set("associates", new ArrayList<TableRow>())
+ .set("flighted", Boolean.TRUE);
+ assertEquals(row, convertedRow);
+ }
+ {
+ // Test repeated fields.
+ Schema subBirdSchema = AvroCoder.of(Bird.SubBird.class).getSchema();
+ GenericRecord nestedRecord = new GenericData.Record(subBirdSchema);
+ nestedRecord.put("species", "other");
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("number", 5L);
+ record.put("associates", Lists.<GenericRecord>newArrayList(nestedRecord));
+ TableRow convertedRow = BigQueryAvroUtils.convertGenericRecordToTableRow(record, tableSchema);
+ TableRow row = new TableRow()
+ .set("associates", Lists.<TableRow>newArrayList(
+ new TableRow().set("species", "other")))
+ .set("number", "5");
+ assertEquals(row, convertedRow);
+ }
+ }
+
+ /**
+ * Pojo class used as the record type in tests.
+ */
+ @DefaultCoder(AvroCoder.class)
+ @SuppressWarnings("unused") // Used by Avro reflection.
+ static class Bird {
+ long number;
+ @Nullable String species;
+ @Nullable Double quality;
+ @Nullable Long quantity;
+ @Nullable Long birthday; // Exercises TIMESTAMP.
+ @Nullable Boolean flighted;
+ @Nullable SubBird scion;
+ SubBird[] associates;
+
+ static class SubBird {
+ @Nullable String species;
+
+ public SubBird() {}
+ }
+
+ public Bird() {
+ associates = new SubBird[1];
+ associates[0] = new SubBird();
+ }
+ }
+}