You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/07/20 20:02:59 UTC
[04/10] incubator-beam git commit: BigQueryIO: move to
google-cloud-platform module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b240525a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
new file mode 100644
index 0000000..130d444
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -0,0 +1,2446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.Coder.Context;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StandardCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.AvroSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
+import org.apache.beam.sdk.util.IOChannelFactory;
+import org.apache.beam.sdk.util.IOChannelUtils;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.util.PropertyNames;
+import org.apache.beam.sdk.util.Reshuffle;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.Transport;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.hadoop.util.ApiErrorExtractor;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link PTransform}s for reading and writing
+ * <a href="https://developers.google.com/bigquery/">BigQuery</a> tables.
+ *
+ * <h3>Table References</h3>
+ * <p>A fully-qualified BigQuery table name consists of three components:
+ * <ul>
+ * <li>{@code projectId}: the Cloud project id (defaults to
+ * {@link GcpOptions#getProject()}).
+ * <li>{@code datasetId}: the BigQuery dataset id, unique within a project.
+ * <li>{@code tableId}: a table id, unique within a dataset.
+ * </ul>
+ *
+ * <p>BigQuery table references are stored as a {@link TableReference}, which comes
+ * from the <a href="https://cloud.google.com/bigquery/client-libraries">
+ * BigQuery Java Client API</a>.
+ * Tables can be referred to as Strings, with or without the {@code projectId}.
+ * A helper function is provided ({@link BigQueryIO#parseTableSpec(String)})
+ * that parses the following string forms into a {@link TableReference}:
+ *
+ * <ul>
+ * <li>[{@code project_id}]:[{@code dataset_id}].[{@code table_id}]
+ * <li>[{@code dataset_id}].[{@code table_id}]
+ * </ul>
+ *
+ * <h3>Reading</h3>
+ * <p>To read from a BigQuery table, apply a {@link BigQueryIO.Read} transformation.
+ * This produces a {@link PCollection} of {@link TableRow TableRows} as output:
+ * <pre>{@code
+ * PCollection<TableRow> shakespeare = pipeline.apply(
+ * BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
+ * }</pre>
+ *
+ * <p>See {@link TableRow} for more information on the {@link TableRow} object.
+ *
+ * <p>Users may provide a query to read from rather than reading all of a BigQuery table. If
+ * specified, the result obtained by executing the specified query will be used as the data of the
+ * input transform.
+ *
+ * <pre>{@code
+ * PCollection<TableRow> shakespeare = pipeline.apply(
+ * BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
+ * }</pre>
+ *
+ * <p>When creating a BigQuery input transform, users should provide either a query or a table.
+ * Pipeline construction will fail with a validation error if neither or both are specified.
+ *
+ * <h3>Writing</h3>
+ * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation.
+ * This consumes a {@link PCollection} of {@link TableRow TableRows} as input.
+ * <pre>{@code
+ * PCollection<TableRow> quotes = ...
+ *
+ * List<TableFieldSchema> fields = new ArrayList<>();
+ * fields.add(new TableFieldSchema().setName("source").setType("STRING"));
+ * fields.add(new TableFieldSchema().setName("quote").setType("STRING"));
+ * TableSchema schema = new TableSchema().setFields(fields);
+ *
+ * quotes.apply(BigQueryIO.Write
+ * .to("my-project:output.output_table")
+ * .withSchema(schema)
+ * .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
+ * }</pre>
+ *
+ * <p>See {@link BigQueryIO.Write} for details on how to specify if a write should
+ * append to an existing table, replace the table, or verify that the table is
+ * empty. Note that the dataset being written to must already exist. Unbounded PCollections can only
+ * be written using {@link WriteDisposition#WRITE_EMPTY} or {@link WriteDisposition#WRITE_APPEND}.
+ *
+ * <h3>Sharding BigQuery output tables</h3>
+ * <p>A common use case is to dynamically generate BigQuery table names based on
+ * the current window. To support this,
+ * {@link BigQueryIO.Write#to(SerializableFunction)}
+ * accepts a function mapping the current window to a tablespec. For example,
+ * here's code that outputs daily tables to BigQuery:
+ * <pre>{@code
+ * PCollection<TableRow> quotes = ...
+ * quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
+ * .apply(BigQueryIO.Write
+ * .withSchema(schema)
+ * .to(new SerializableFunction<BoundedWindow, String>() {
+ * public String apply(BoundedWindow window) {
+ * // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
+ * String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
+ * .withZone(DateTimeZone.UTC)
+ * .print(((IntervalWindow) window).start());
+ * return "my-project:output.output_table_" + dayString;
+ * }
+ * }));
+ * }</pre>
+ *
+ * <p>Per-window tables are not yet supported in batch mode.
+ *
+ * <h3>Permissions</h3>
+ * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding {@link PipelineRunner}s for
+ * more details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
+ * </a> for security and permission related information specific to BigQuery.
+ */
+public class BigQueryIO {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
+
+ /**
+ * Singleton instance of the JSON factory used to read and write JSON
+ * formatted rows.
+ */
+ private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
+
+ /**
+ * Project IDs must contain 6-63 lowercase letters, digits, or dashes.
+ * IDs must start with a letter and may not end with a dash.
+ * This regex isn't exact - this allows for patterns that would be rejected by
+ * the service, but this is sufficient for basic parsing of table references.
+ */
+ private static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]{4,61}[a-z0-9]";
+
+ /**
+ * Regular expression that matches Dataset IDs.
+ */
+ private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";
+
+ /**
+ * Regular expression that matches Table IDs.
+ */
+ private static final String TABLE_REGEXP = "[-\\w$@]{1,1024}";
+
+ /**
+ * Matches table specifications in the form {@code "[project_id]:[dataset_id].[table_id]"} or
+ * {@code "[dataset_id].[table_id]"}.
+ */
+ private static final String DATASET_TABLE_REGEXP =
+ String.format("((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)", PROJECT_ID_REGEXP,
+ DATASET_REGEXP, TABLE_REGEXP);
+
+ private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
+
+ // TODO: make this private and remove improper access from BigQueryIOTranslator.
+ public static final String SET_PROJECT_FROM_OPTIONS_WARNING =
+ "No project specified for BigQuery table \"%1$s.%2$s\". Assuming it is in \"%3$s\". If the"
+ + " table is in a different project please specify it as a part of the BigQuery table"
+ + " definition.";
+
+ private static final String RESOURCE_NOT_FOUND_ERROR =
+ "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
+ + " execution. If the %1$s is created by an earlier stage of the pipeline, this"
+ + " validation can be disabled using #withoutValidation.";
+
+ private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR =
+ "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by"
+ + " an earlier stage of the pipeline, this validation can be disabled using"
+ + " #withoutValidation.";
+
+ /**
+ * Parse a table specification in the form
+ * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
+ *
+ * <p>If the project id is omitted, the default project id is used.
+ */
+ public static TableReference parseTableSpec(String tableSpec) {
+ Matcher match = TABLE_SPEC.matcher(tableSpec);
+ if (!match.matches()) {
+ throw new IllegalArgumentException(
+ "Table reference is not in [project_id]:[dataset_id].[table_id] "
+ + "format: " + tableSpec);
+ }
+
+ TableReference ref = new TableReference();
+ ref.setProjectId(match.group("PROJECT"));
+
+ return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
+ }
+
+ /**
+ * Returns a canonical string representation of the {@link TableReference}.
+ */
+ public static String toTableSpec(TableReference ref) {
+ StringBuilder sb = new StringBuilder();
+ if (ref.getProjectId() != null) {
+ sb.append(ref.getProjectId());
+ sb.append(":");
+ }
+
+ sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
+ return sb.toString();
+ }
+
+ /**
+ * A {@link PTransform} that reads from a BigQuery table and returns a
+ * {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table.
+ *
+ * <p>Each {@link TableRow} contains values indexed by column name. Here is a
+ * sample processing function that processes a "line" column from rows:
+ * <pre>{@code
+ * static class ExtractWordsFn extends DoFn<TableRow, String> {
+ * public void processElement(ProcessContext c) {
+ * // Get the "line" field of the TableRow object, split it into words, and emit them.
+ * TableRow row = c.element();
+ * String[] words = row.get("line").toString().split("[^a-zA-Z']+");
+ * for (String word : words) {
+ * if (!word.isEmpty()) {
+ * c.output(word);
+ * }
+ * }
+ * }
+ * }}</pre>
+ */
+ public static class Read {
+
+ /**
+ * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or
+ * {@code "[dataset_id].[table_id]"} for tables within the current project.
+ */
+ public static Bound from(String tableSpec) {
+ return new Bound().from(tableSpec);
+ }
+
+ /**
+ * Reads results received after executing the given query.
+ */
+ public static Bound fromQuery(String query) {
+ return new Bound().fromQuery(query);
+ }
+
+ /**
+ * Reads a BigQuery table specified as a {@link TableReference} object.
+ */
+ public static Bound from(TableReference table) {
+ return new Bound().from(table);
+ }
+
+ /**
+ * Disables BigQuery table validation, which is enabled by default.
+ */
+ public static Bound withoutValidation() {
+ return new Bound().withoutValidation();
+ }
+
+ /**
+ * A {@link PTransform} that reads from a BigQuery table and returns a bounded
+ * {@link PCollection} of {@link TableRow TableRows}.
+ */
+ public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
+ @Nullable final String jsonTableRef;
+ @Nullable final String query;
+
+ /**
+ * Disable validation that the table exists or the query succeeds prior to pipeline
+ * submission. Basic validation (such as ensuring that a query or table is specified) still
+ * occurs.
+ */
+ final boolean validate;
+ @Nullable final Boolean flattenResults;
+ @Nullable BigQueryServices bigQueryServices;
+
+ private static final String QUERY_VALIDATION_FAILURE_ERROR =
+ "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ + " pipeline, This validation can be disabled using #withoutValidation.";
+
+ // The maximum number of retries to poll a BigQuery job in the cleanup phase.
+ // We expect the jobs have already DONE, and don't need a high max retires.
+ private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10;
+
+ private Bound() {
+ this(
+ null /* name */,
+ null /* query */,
+ null /* jsonTableRef */,
+ true /* validate */,
+ null /* flattenResults */,
+ null /* bigQueryServices */);
+ }
+
+ private Bound(
+ String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate,
+ @Nullable Boolean flattenResults, @Nullable BigQueryServices bigQueryServices) {
+ super(name);
+ this.jsonTableRef = jsonTableRef;
+ this.query = query;
+ this.validate = validate;
+ this.flattenResults = flattenResults;
+ this.bigQueryServices = bigQueryServices;
+ }
+
+ /**
+ * Returns a copy of this transform that reads from the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound from(String tableSpec) {
+ return from(parseTableSpec(tableSpec));
+ }
+
+ /**
+ * Returns a copy of this transform that reads from the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound from(TableReference table) {
+ return new Bound(
+ name, query, toJsonString(table), validate, flattenResults, bigQueryServices);
+ }
+
+ /**
+ * Returns a copy of this transform that reads the results of the specified query.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>By default, the query results will be flattened -- see
+ * "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
+ * Jobs documentation</a> for more information. To disable flattening, use
+ * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
+ */
+ public Bound fromQuery(String query) {
+ return new Bound(name, query, jsonTableRef, validate,
+ MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), bigQueryServices);
+ }
+
+ /**
+ * Disable validation that the table exists or the query succeeds prior to pipeline
+ * submission. Basic validation (such as ensuring that a query or table is specified) still
+ * occurs.
+ */
+ public Bound withoutValidation() {
+ return new Bound(name, query, jsonTableRef, false, flattenResults, bigQueryServices);
+ }
+
+ /**
+ * Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
+ * flattening of query results</a>.
+ *
+ * <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
+ * from a table will cause an error during validation.
+ */
+ public Bound withoutResultFlattening() {
+ return new Bound(name, query, jsonTableRef, validate, false, bigQueryServices);
+ }
+
+ @VisibleForTesting
+ Bound withTestServices(BigQueryServices testServices) {
+ return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices);
+ }
+
+ @Override
+ public void validate(PInput input) {
+ // Even if existence validation is disabled, we need to make sure that the BigQueryIO
+ // read is properly specified.
+ BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ TableReference table = getTableWithDefaultProject(bqOptions);
+ if (table == null && query == null) {
+ throw new IllegalStateException(
+ "Invalid BigQuery read operation, either table reference or query has to be set");
+ } else if (table != null && query != null) {
+ throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
+ + " query and a table, only one of these should be provided");
+ } else if (table != null && flattenResults != null) {
+ throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+ + " table with a result flattening preference, which is not configurable");
+ } else if (query != null && flattenResults == null) {
+ throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+ + " query without a result flattening preference");
+ }
+
+ if (validate) {
+ BigQueryServices bqServices = getBigQueryServices();
+ // Check for source table/query presence for early failure notification.
+ // Note that a presence check can fail if the table or dataset are created by earlier
+ // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these
+ // cases the withoutValidation method can be used to disable the check.
+ if (table != null) {
+ DatasetService datasetService = bqServices.getDatasetService(bqOptions);
+ verifyDatasetPresence(datasetService, table);
+ verifyTablePresence(datasetService, table);
+ }
+ if (query != null) {
+ JobService jobService = bqServices.getJobService(bqOptions);
+ try {
+ jobService.dryRunQuery(bqOptions.getProject(), query);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(QUERY_VALIDATION_FAILURE_ERROR, query), e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public PCollection<TableRow> apply(PInput input) {
+ String uuid = randomUUIDString();
+ final String jobIdToken = "beam_job_" + uuid;
+
+ BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ BoundedSource<TableRow> source;
+ final BigQueryServices bqServices = getBigQueryServices();
+
+ final String extractDestinationDir;
+ String tempLocation = bqOptions.getTempLocation();
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ extractDestinationDir = factory.resolve(tempLocation, uuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve extract destination directory in %s", tempLocation));
+ }
+
+ final String executingProject = bqOptions.getProject();
+ if (!Strings.isNullOrEmpty(query)) {
+ String queryTempDatasetId = "temp_dataset_" + uuid;
+ String queryTempTableId = "temp_table_" + uuid;
+
+ TableReference queryTempTableRef = new TableReference()
+ .setProjectId(executingProject)
+ .setDatasetId(queryTempDatasetId)
+ .setTableId(queryTempTableId);
+
+ source = BigQueryQuerySource.create(
+ jobIdToken, query, queryTempTableRef, flattenResults,
+ extractDestinationDir, bqServices);
+ } else {
+ TableReference inputTable = getTableWithDefaultProject(bqOptions);
+ source = BigQueryTableSource.create(
+ jobIdToken, inputTable, extractDestinationDir, bqServices, executingProject);
+ }
+ PassThroughThenCleanup.CleanupOperation cleanupOperation =
+ new PassThroughThenCleanup.CleanupOperation() {
+ @Override
+ void cleanup(PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+
+ JobReference jobRef = new JobReference()
+ .setProjectId(executingProject)
+ .setJobId(getExtractJobId(jobIdToken));
+ Job extractJob = bqServices.getJobService(bqOptions).pollJob(
+ jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
+
+ Collection<String> extractFiles = null;
+ if (extractJob != null) {
+ extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+ } else {
+ IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+ Collection<String> dirMatch = factory.match(extractDestinationDir);
+ if (!dirMatch.isEmpty()) {
+ extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+ }
+ }
+ if (extractFiles != null && !extractFiles.isEmpty()) {
+ new GcsUtilFactory().create(options).remove(extractFiles);
+ }
+ }};
+ return input.getPipeline()
+ .apply(org.apache.beam.sdk.io.Read.from(source))
+ .setCoder(getDefaultOutputCoder())
+ .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
+ }
+
+ @Override
+ protected Coder<TableRow> getDefaultOutputCoder() {
+ return TableRowJsonCoder.of();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ TableReference table = getTable();
+
+ if (table != null) {
+ builder.add(DisplayData.item("table", toTableSpec(table))
+ .withLabel("Table"));
+ }
+
+ builder
+ .addIfNotNull(DisplayData.item("query", query)
+ .withLabel("Query"))
+ .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+ .withLabel("Flatten Query Results"))
+ .addIfNotDefault(DisplayData.item("validation", validate)
+ .withLabel("Validation Enabled"),
+ true);
+ }
+
+ /**
+ * Returns the table to read, or {@code null} if reading from a query instead.
+ *
+ * <p>If the table's project is not specified, use the executing project.
+ */
+ @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
+ TableReference table = getTable();
+ if (table != null && Strings.isNullOrEmpty(table.getProjectId())) {
+ // If user does not specify a project we assume the table to be located in
+ // the default project.
+ table.setProjectId(bqOptions.getProject());
+ }
+ return table;
+ }
+
+ /**
+ * Returns the table to read, or {@code null} if reading from a query instead.
+ */
+ @Nullable
+ public TableReference getTable() {
+ return fromJsonString(jsonTableRef, TableReference.class);
+ }
+
+ /**
+ * Returns the query to be read, or {@code null} if reading from a table instead.
+ */
+ public String getQuery() {
+ return query;
+ }
+
+ /**
+ * Returns true if table validation is enabled.
+ */
+ public boolean getValidate() {
+ return validate;
+ }
+
+ /**
+ * Returns true/false if result flattening is enabled/disabled, or null if not applicable.
+ */
+ public Boolean getFlattenResults() {
+ return flattenResults;
+ }
+
+ private BigQueryServices getBigQueryServices() {
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
+ }
+ return bigQueryServices;
+ }
+ }
+
+ /** Disallow construction of utility class. */
+ private Read() {}
+ }
+
+ /**
+ * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
+ * has been processed.
+ */
+ @VisibleForTesting
+ static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private CleanupOperation cleanupOperation;
+
+ PassThroughThenCleanup(CleanupOperation cleanupOperation) {
+ this.cleanupOperation = cleanupOperation;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ TupleTag<T> mainOutput = new TupleTag<>();
+ TupleTag<Void> cleanupSignal = new TupleTag<>();
+ PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
+ .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
+
+ PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
+ .setCoder(VoidCoder.of())
+ .apply(View.<Void>asSingleton().withDefaultValue(null));
+
+ input.getPipeline()
+ .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
+ .apply("Cleanup", ParDo.of(
+ new DoFn<CleanupOperation, Void>() {
+ @Override
+ public void processElement(ProcessContext c)
+ throws Exception {
+ c.element().cleanup(c.getPipelineOptions());
+ }
+ }).withSideInputs(cleanupSignalView));
+
+ return outputs.get(mainOutput);
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+
+ abstract static class CleanupOperation implements Serializable {
+ abstract void cleanup(PipelineOptions options) throws Exception;
+ }
+ }
+
+ /**
+ * A {@link BigQuerySourceBase} for reading BigQuery tables.
+ */
+ @VisibleForTesting
+ static class BigQueryTableSource extends BigQuerySourceBase {
+
+ static BigQueryTableSource create(
+ String jobIdToken,
+ TableReference table,
+ String extractDestinationDir,
+ BigQueryServices bqServices,
+ String executingProject) {
+ return new BigQueryTableSource(
+ jobIdToken, table, extractDestinationDir, bqServices, executingProject);
+ }
+
+ private final String jsonTable;
+ private final AtomicReference<Long> tableSizeBytes;
+
+ private BigQueryTableSource(
+ String jobIdToken,
+ TableReference table,
+ String extractDestinationDir,
+ BigQueryServices bqServices,
+ String executingProject) {
+ super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+ checkNotNull(table, "table");
+ this.jsonTable = toJsonString(table);
+ this.tableSizeBytes = new AtomicReference<>();
+ }
+
+ @Override
+ protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
+ return JSON_FACTORY.fromString(jsonTable, TableReference.class);
+ }
+
+ @Override
+ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableRef = JSON_FACTORY.fromString(jsonTable, TableReference.class);
+ return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
+ }
+
+ @Override
+ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ if (tableSizeBytes.get() == null) {
+ TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class);
+
+ Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+ .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId())
+ .getNumBytes();
+ tableSizeBytes.compareAndSet(null, numBytes);
+ }
+ return tableSizeBytes.get();
+ }
+
+ @Override
+ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ // Do nothing.
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("table", jsonTable));
+ }
+ }
+
+ /**
+ * A {@link BigQuerySourceBase} for querying BigQuery tables.
+ */
+ @VisibleForTesting
+ static class BigQueryQuerySource extends BigQuerySourceBase {
+
+ static BigQueryQuerySource create(
+ String jobIdToken,
+ String query,
+ TableReference queryTempTableRef,
+ Boolean flattenResults,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ return new BigQueryQuerySource(
+ jobIdToken,
+ query,
+ queryTempTableRef,
+ flattenResults,
+ extractDestinationDir,
+ bqServices);
+ }
+
+ private final String query;
+ private final String jsonQueryTempTable;
+ private final Boolean flattenResults;
+ private transient AtomicReference<JobStatistics> dryRunJobStats;
+
+ private BigQueryQuerySource(
+ String jobIdToken,
+ String query,
+ TableReference queryTempTableRef,
+ Boolean flattenResults,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ super(jobIdToken, extractDestinationDir, bqServices,
+ checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId());
+ this.query = checkNotNull(query, "query");
+ this.jsonQueryTempTable = toJsonString(queryTempTableRef);
+ this.flattenResults = checkNotNull(flattenResults, "flattenResults");
+ this.dryRunJobStats = new AtomicReference<>();
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
+ }
+
+ @Override
+ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ return new BigQueryReader(this, bqServices.getReaderFromQuery(
+ bqOptions, query, executingProject, flattenResults));
+ }
+
+ @Override
+ protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+ throws IOException, InterruptedException {
+ // 1. Find the location of the query.
+ TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
+ .getQuery()
+ .getReferencedTables()
+ .get(0);
+ DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ String location = tableService.getTable(
+ dryRunTempTable.getProjectId(),
+ dryRunTempTable.getDatasetId(),
+ dryRunTempTable.getTableId()).getLocation();
+
+ // 2. Create the temporary dataset in the query location.
+ TableReference tableToExtract =
+ JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
+ tableService.createDataset(
+ tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
+
+ // 3. Execute the query.
+ String queryJobId = jobIdToken + "-query";
+ executeQuery(
+ executingProject,
+ queryJobId,
+ query,
+ tableToExtract,
+ flattenResults,
+ bqServices.getJobService(bqOptions));
+ return tableToExtract;
+ }
+
+ @Override
+ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ TableReference tableToRemove =
+ JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
+
+ DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ tableService.deleteTable(
+ tableToRemove.getProjectId(),
+ tableToRemove.getDatasetId(),
+ tableToRemove.getTableId());
+ tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("query", query));
+ }
+ private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
+ throws InterruptedException, IOException {
+ if (dryRunJobStats.get() == null) {
+ JobStatistics jobStats =
+ bqServices.getJobService(bqOptions).dryRunQuery(executingProject, query);
+ dryRunJobStats.compareAndSet(null, jobStats);
+ }
+ return dryRunJobStats.get();
+ }
+
+ private static void executeQuery(
+ String executingProject,
+ String jobId,
+ String query,
+ TableReference destinationTable,
+ boolean flattenResults,
+ JobService jobService) throws IOException, InterruptedException {
+ JobReference jobRef = new JobReference()
+ .setProjectId(executingProject)
+ .setJobId(jobId);
+ JobConfigurationQuery queryConfig = new JobConfigurationQuery();
+ queryConfig
+ .setQuery(query)
+ .setAllowLargeResults(true)
+ .setCreateDisposition("CREATE_IF_NEEDED")
+ .setDestinationTable(destinationTable)
+ .setFlattenResults(flattenResults)
+ .setPriority("BATCH")
+ .setWriteDisposition("WRITE_EMPTY");
+ jobService.startQueryJob(jobRef, queryConfig);
+ Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ if (parseStatus(job) != Status.SUCCEEDED) {
+ throw new IOException("Query job failed: " + jobId);
+ }
+ return;
+ }
+
+ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+ in.defaultReadObject();
+ dryRunJobStats = new AtomicReference<>();
+ }
+ }
+
+ /**
+ * An abstract {@link BoundedSource} to read a table from BigQuery.
+ *
+ * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
+ * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
+ * and {@link BigQueryQuerySource}, depending on the configuration of the read.
+ * Specifically,
+ * <ul>
+ * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
+ * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
+ * </ul>
+ * ...
+ */
+ private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
+ // The maximum number of attempts to verify temp files.
+ private static final int MAX_FILES_VERIFY_ATTEMPTS = 10;
+
+ // The maximum number of retries to poll a BigQuery job.
+ protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+ // The initial backoff for verifying temp files.
+ private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+ protected final String jobIdToken;
+ protected final String extractDestinationDir;
+ protected final BigQueryServices bqServices;
+ protected final String executingProject;
+
+ private BigQuerySourceBase(
+ String jobIdToken,
+ String extractDestinationDir,
+ BigQueryServices bqServices,
+ String executingProject) {
+ this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+ this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
+ this.bqServices = checkNotNull(bqServices, "bqServices");
+ this.executingProject = checkNotNull(executingProject, "executingProject");
+ }
+
+ @Override
+ public List<BoundedSource<TableRow>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableToExtract = getTableToExtract(bqOptions);
+ JobService jobService = bqServices.getJobService(bqOptions);
+ String extractJobId = getExtractJobId(jobIdToken);
+ List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+ TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
+ tableToExtract.getProjectId(),
+ tableToExtract.getDatasetId(),
+ tableToExtract.getTableId()).getSchema();
+
+ cleanupTempResource(bqOptions);
+ return createSources(tempFiles, tableSchema);
+ }
+
+ protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
+
+ protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void validate() {
+ // Do nothing, validation is done in BigQuery.Read.
+ }
+
+ @Override
+ public Coder<TableRow> getDefaultOutputCoder() {
+ return TableRowJsonCoder.of();
+ }
+
+ private List<String> executeExtract(
+ String jobId, TableReference table, JobService jobService)
+ throws InterruptedException, IOException {
+ JobReference jobRef = new JobReference()
+ .setProjectId(executingProject)
+ .setJobId(jobId);
+
+ String destinationUri = getExtractDestinationUri(extractDestinationDir);
+ JobConfigurationExtract extract = new JobConfigurationExtract()
+ .setSourceTable(table)
+ .setDestinationFormat("AVRO")
+ .setDestinationUris(ImmutableList.of(destinationUri));
+
+ LOG.info("Starting BigQuery extract job: {}", jobId);
+ jobService.startExtractJob(jobRef, extract);
+ Job extractJob =
+ jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ if (parseStatus(extractJob) != Status.SUCCEEDED) {
+ throw new IOException(String.format(
+ "Extract job %s failed, status: %s",
+ extractJob.getJobReference().getJobId(), extractJob.getStatus()));
+ }
+
+ List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+ return ImmutableList.copyOf(tempFiles);
+ }
+
+ private List<BoundedSource<TableRow>> createSources(
+ List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+ final String jsonSchema = JSON_FACTORY.toString(tableSchema);
+
+ SerializableFunction<GenericRecord, TableRow> function =
+ new SerializableFunction<GenericRecord, TableRow>() {
+ @Override
+ public TableRow apply(GenericRecord input) {
+ return BigQueryAvroUtils.convertGenericRecordToTableRow(
+ input, fromJsonString(jsonSchema, TableSchema.class));
+ }};
+
+ List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
+ BackOff backoff = new AttemptBoundedExponentialBackOff(
+ MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS);
+ for (String fileName : files) {
+ while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+ if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) {
+ break;
+ }
+ }
+ avroSources.add(new TransformingSource<>(
+ AvroSource.from(fileName), function, getDefaultOutputCoder()));
+ }
+ return ImmutableList.copyOf(avroSources);
+ }
+
+ protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> {
+ private final BigQuerySourceBase source;
+ private final BigQueryServices.BigQueryJsonReader reader;
+
+ private BigQueryReader(
+ BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
+ this.source = source;
+ this.reader = reader;
+ }
+
+ @Override
+ public BoundedSource<TableRow> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return reader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return reader.advance();
+ }
+
+ @Override
+ public TableRow getCurrent() throws NoSuchElementException {
+ return reader.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
+ * and transforms elements to type {@code V}.
+ */
+ @VisibleForTesting
+ static class TransformingSource<T, V> extends BoundedSource<V> {
+ private final BoundedSource<T> boundedSource;
+ private final SerializableFunction<T, V> function;
+ private final Coder<V> outputCoder;
+
+ TransformingSource(
+ BoundedSource<T> boundedSource,
+ SerializableFunction<T, V> function,
+ Coder<V> outputCoder) {
+ this.boundedSource = boundedSource;
+ this.function = function;
+ this.outputCoder = outputCoder;
+ }
+
+ @Override
+ public List<? extends BoundedSource<V>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return Lists.transform(
+ boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+ new Function<BoundedSource<T>, BoundedSource<V>>() {
+ @Override
+ public BoundedSource<V> apply(BoundedSource<T> input) {
+ return new TransformingSource<>(input, function, outputCoder);
+ }
+ });
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return boundedSource.getEstimatedSizeBytes(options);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return boundedSource.producesSortedKeys(options);
+ }
+
+ @Override
+ public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
+ return new TransformingReader(boundedSource.createReader(options));
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public Coder<V> getDefaultOutputCoder() {
+ return outputCoder;
+ }
+
+ private class TransformingReader extends BoundedReader<V> {
+ private final BoundedReader<T> boundedReader;
+
+ private TransformingReader(BoundedReader<T> boundedReader) {
+ this.boundedReader = boundedReader;
+ }
+
+ @Override
+ public synchronized BoundedSource<V> getCurrentSource() {
+ return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return boundedReader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return boundedReader.advance();
+ }
+
+ @Override
+ public V getCurrent() throws NoSuchElementException {
+ T current = boundedReader.getCurrent();
+ return function.apply(current);
+ }
+
+ @Override
+ public void close() throws IOException {
+ boundedReader.close();
+ }
+
+ @Override
+ public synchronized BoundedSource<V> splitAtFraction(double fraction) {
+ return new TransformingSource<>(
+ boundedReader.splitAtFraction(fraction), function, outputCoder);
+ }
+
+ @Override
+ public Double getFractionConsumed() {
+ return boundedReader.getFractionConsumed();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return boundedReader.getCurrentTimestamp();
+ }
+ }
+ }
+
+ private static String getExtractJobId(String jobIdToken) {
+ return jobIdToken + "-extract";
+ }
+
+ private static String getExtractDestinationUri(String extractDestinationDir) {
+ return String.format("%s/%s", extractDestinationDir, "*.avro");
+ }
+
+ private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+ throws IOException {
+ JobStatistics jobStats = extractJob.getStatistics();
+ List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
+ if (counts.size() != 1) {
+ String errorMessage = (counts.size() == 0
+ ? "No destination uri file count received."
+ : String.format("More than one destination uri file count received. First two are %s, %s",
+ counts.get(0), counts.get(1)));
+ throw new RuntimeException(errorMessage);
+ }
+ long filesCount = counts.get(0);
+
+ ImmutableList.Builder<String> paths = ImmutableList.builder();
+ IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+ for (long i = 0; i < filesCount; ++i) {
+ String filePath =
+ factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
+ paths.add(filePath);
+ }
+ return paths.build();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows}
+ * to a BigQuery table.
+ *
+ * <p>In BigQuery, each table has an encosing dataset. The dataset being written must already
+ * exist.
+ *
+ * <p>By default, tables will be created if they do not exist, which corresponds to a
+ * {@link CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's
+ * Jobs API. A schema must be provided (via {@link BigQueryIO.Write#withSchema(TableSchema)}),
+ * or else the transform may fail at runtime with an {@link IllegalArgumentException}.
+ *
+ * <p>By default, writes require an empty table, which corresponds to
+ * a {@link WriteDisposition#WRITE_EMPTY} disposition that matches the
+ * default of BigQuery's Jobs API.
+ *
+ * <p>Here is a sample transform that produces TableRow values containing
+ * "word" and "count" columns:
+ * <pre>{@code
+ * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
+ * public void processElement(ProcessContext c) {
+ * TableRow row = new TableRow()
+ * .set("word", c.element().getKey())
+ * .set("count", c.element().getValue().intValue());
+ * c.output(row);
+ * }
+ * }}</pre>
+ */
+ public static class Write {
+ /**
+ * An enumeration type for the BigQuery create disposition strings.
+ *
+ * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition">
+ * <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a>
+ */
+ public enum CreateDisposition {
+ /**
+ * Specifics that tables should not be created.
+ *
+ * <p>If the output table does not exist, the write fails.
+ */
+ CREATE_NEVER,
+
+ /**
+ * Specifies that tables should be created if needed. This is the default
+ * behavior.
+ *
+ * <p>Requires that a table schema is provided via {@link BigQueryIO.Write#withSchema}.
+ * This precondition is checked before starting a job. The schema is
+ * not required to match an existing table's schema.
+ *
+ * <p>When this transformation is executed, if the output table does not
+ * exist, the table is created from the provided schema. Note that even if
+ * the table exists, it may be recreated if necessary when paired with a
+ * {@link WriteDisposition#WRITE_TRUNCATE}.
+ */
+ CREATE_IF_NEEDED
+ }
+
+ /**
+ * An enumeration type for the BigQuery write disposition strings.
+ *
+ * @see <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition">
+ * <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a>
+ */
+ public enum WriteDisposition {
+ /**
+ * Specifies that write should replace a table.
+ *
+ * <p>The replacement may occur in multiple steps - for instance by first
+ * removing the existing table, then creating a replacement, then filling
+ * it in. This is not an atomic operation, and external programs may
+ * see the table in any of these intermediate steps.
+ */
+ WRITE_TRUNCATE,
+
+ /**
+ * Specifies that rows may be appended to an existing table.
+ */
+ WRITE_APPEND,
+
+ /**
+ * Specifies that the output table must be empty. This is the default
+ * behavior.
+ *
+ * <p>If the output table is not empty, the write fails at runtime.
+ *
+ * <p>This check may occur long before data is written, and does not
+ * guarantee exclusive access to the table. If two programs are run
+ * concurrently, each specifying the same output table and
+ * a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is possible
+ * for both to succeed.
+ */
+ WRITE_EMPTY
+ }
+
+ /**
+ * Creates a write transformation for the given table specification.
+ *
+ * <p>Refer to {@link #parseTableSpec(String)} for the specification format.
+ */
+ public static Bound to(String tableSpec) {
+ return new Bound().to(tableSpec);
+ }
+
+ /** Creates a write transformation for the given table. */
+ public static Bound to(TableReference table) {
+ return new Bound().to(table);
+ }
+
+ /**
+ * Creates a write transformation from a function that maps windows to table specifications.
+ * Each time a new window is encountered, this function will be called and the resulting table
+ * will be created. Records within that window will be written to the associated table.
+ *
+ * <p>See {@link #parseTableSpec(String)} for the format that {@code tableSpecFunction} should
+ * return.
+ *
+ * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it should
+ * always return the same table specification.
+ */
+ public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ return new Bound().to(tableSpecFunction);
+ }
+
+ /**
+ * Creates a write transformation from a function that maps windows to {@link TableReference}
+ * objects.
+ *
+ * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
+ * always return the same table reference.
+ */
+ public static Bound toTableReference(
+ SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
+ return new Bound().toTableReference(tableRefFunction);
+ }
+
+ /**
+ * Creates a write transformation with the specified schema to use in table creation.
+ *
+ * <p>The schema is <i>required</i> only if writing to a table that does not already
+ * exist, and {@link CreateDisposition} is set to
+ * {@link CreateDisposition#CREATE_IF_NEEDED}.
+ */
+ public static Bound withSchema(TableSchema schema) {
+ return new Bound().withSchema(schema);
+ }
+
+ /** Creates a write transformation with the specified options for creating the table. */
+ public static Bound withCreateDisposition(CreateDisposition disposition) {
+ return new Bound().withCreateDisposition(disposition);
+ }
+
+ /** Creates a write transformation with the specified options for writing to the table. */
+ public static Bound withWriteDisposition(WriteDisposition disposition) {
+ return new Bound().withWriteDisposition(disposition);
+ }
+
+ /**
+ * Creates a write transformation with BigQuery table validation disabled.
+ */
+ public static Bound withoutValidation() {
+ return new Bound().withoutValidation();
+ }
+
+ /**
+ * A {@link PTransform} that can write either a bounded or unbounded
+ * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table.
+ */
+ public static class Bound extends PTransform<PCollection<TableRow>, PDone> {
+ @Nullable final String jsonTableRef;
+
+ @Nullable final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
+
+ // Table schema. The schema is required only if the table does not exist.
+ @Nullable final String jsonSchema;
+
+ // Options for creating the table. Valid values are CREATE_IF_NEEDED and
+ // CREATE_NEVER.
+ final CreateDisposition createDisposition;
+
+ // Options for writing to the table. Valid values are WRITE_TRUNCATE,
+ // WRITE_APPEND and WRITE_EMPTY.
+ final WriteDisposition writeDisposition;
+
+ // An option to indicate if table validation is desired. Default is true.
+ final boolean validate;
+
+ @Nullable private BigQueryServices bigQueryServices;
+
+ private static class TranslateTableSpecFunction implements
+ SerializableFunction<BoundedWindow, TableReference> {
+ private SerializableFunction<BoundedWindow, String> tableSpecFunction;
+
+ TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ this.tableSpecFunction = tableSpecFunction;
+ }
+
+ @Override
+ public TableReference apply(BoundedWindow value) {
+ return parseTableSpec(tableSpecFunction.apply(value));
+ }
+ }
+
+ /**
+ * @deprecated Should be private. Instead, use one of the factory methods in
+ * {@link BigQueryIO.Write}, such as {@link BigQueryIO.Write#to(String)}, to create an
+ * instance of this class.
+ */
+ @Deprecated
+ public Bound() {
+ this(
+ null /* name */,
+ null /* jsonTableRef */,
+ null /* tableRefFunction */,
+ null /* jsonSchema */,
+ CreateDisposition.CREATE_IF_NEEDED,
+ WriteDisposition.WRITE_EMPTY,
+ true /* validate */,
+ null /* bigQueryServices */);
+ }
+
+ private Bound(String name, @Nullable String jsonTableRef,
+ @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction,
+ @Nullable String jsonSchema,
+ CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate,
+ @Nullable BigQueryServices bigQueryServices) {
+ super(name);
+ this.jsonTableRef = jsonTableRef;
+ this.tableRefFunction = tableRefFunction;
+ this.jsonSchema = jsonSchema;
+ this.createDisposition = checkNotNull(createDisposition, "createDisposition");
+ this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+ this.validate = validate;
+ this.bigQueryServices = bigQueryServices;
+ }
+
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table. Refer to
+ * {@link #parseTableSpec(String)} for the specification format.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound to(String tableSpec) {
+ return to(parseTableSpec(tableSpec));
+ }
+
+ /**
+ * Returns a copy of this write transformation, but writing to the specified table.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound to(TableReference table) {
+ return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, validate, bigQueryServices);
+ }
+
+ /**
+ * Returns a copy of this write transformation, but using the specified function to determine
+ * which table to write to for each window.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>{@code tableSpecFunction} should be deterministic. When given the same window, it
+ * should always return the same table specification.
+ */
+ public Bound to(
+ SerializableFunction<BoundedWindow, String> tableSpecFunction) {
+ return toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
+ }
+
+ /**
+ * Returns a copy of this write transformation, but using the specified function to determine
+ * which table to write to for each window.
+ *
+ * <p>Does not modify this object.
+ *
+ * <p>{@code tableRefFunction} should be deterministic. When given the same window, it should
+ * always return the same table reference.
+ */
+ public Bound toTableReference(
+ SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, validate, bigQueryServices);
+ }
+
+ /**
+ * Returns a copy of this write transformation, but using the specified schema for rows
+ * to be written.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound withSchema(TableSchema schema) {
+ return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema),
+ createDisposition, writeDisposition, validate, bigQueryServices);
+ }
+
+ /**
+ * Returns a copy of this write transformation, but using the specified create disposition.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound withCreateDisposition(CreateDisposition createDisposition) {
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, validate, bigQueryServices);
+ }
+
+ /**
+ * Returns a copy of this write transformation, but using the specified write disposition.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound withWriteDisposition(WriteDisposition writeDisposition) {
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, validate, bigQueryServices);
+ }
+
+ /**
+ * Returns a copy of this write transformation, but without BigQuery table validation.
+ *
+ * <p>Does not modify this object.
+ */
+ public Bound withoutValidation() {
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, false, bigQueryServices);
+ }
+
+ @VisibleForTesting
+ Bound withTestServices(BigQueryServices testServices) {
+ return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition,
+ writeDisposition, validate, testServices);
+ }
+
+ private static void verifyTableEmpty(
+ DatasetService datasetService,
+ TableReference table) {
+ try {
+ boolean isEmpty = datasetService.isTableEmpty(
+ table.getProjectId(), table.getDatasetId(), table.getTableId());
+ if (!isEmpty) {
+ throw new IllegalArgumentException(
+ "BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
+ }
+ } catch (IOException | InterruptedException e) {
+ ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+ if (e instanceof IOException && errorExtractor.itemNotFound((IOException) e)) {
+ // Nothing to do. If the table does not exist, it is considered empty.
+ } else {
+ throw new RuntimeException(
+ "unable to confirm BigQuery table emptiness for table "
+ + BigQueryIO.toTableSpec(table), e);
+ }
+ }
+ }
+
+ @Override
+ public void validate(PCollection<TableRow> input) {
+ BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ // Exactly one of the table and table reference can be configured.
+ checkState(
+ jsonTableRef != null || tableRefFunction != null,
+ "must set the table reference of a BigQueryIO.Write transform");
+ checkState(
+ jsonTableRef == null || tableRefFunction == null,
+ "Cannot set both a table reference and a table function for a BigQueryIO.Write"
+ + " transform");
+
+ // Require a schema if creating one or more tables.
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_IF_NEEDED || jsonSchema != null,
+ "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
+
+ // The user specified a table.
+ if (jsonTableRef != null && validate) {
+ TableReference table = getTableWithDefaultProject(options);
+
+ DatasetService datasetService = getBigQueryServices().getDatasetService(options);
+ // Check for destination table presence and emptiness for early failure notification.
+ // Note that a presence check can fail when the table or dataset is created by an earlier
+ // stage of the pipeline. For these cases the #withoutValidation method can be used to
+ // disable the check.
+ verifyDatasetPresence(datasetService, table);
+ if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
+ verifyTablePresence(datasetService, table);
+ }
+ if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
+ verifyTableEmpty(datasetService, table);
+ }
+ }
+
+ if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) {
+ // We will use BigQuery's streaming write API -- validate supported dispositions.
+ checkArgument(
+ createDisposition != CreateDisposition.CREATE_NEVER,
+ "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when"
+ + " using a tablespec function.");
+
+ checkArgument(
+ writeDisposition != WriteDisposition.WRITE_TRUNCATE,
+ "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or"
+ + " when using a tablespec function.");
+ } else {
+ // We will use a BigQuery load job -- validate the temp location.
+ String tempLocation = options.getTempLocation();
+ checkArgument(
+ !Strings.isNullOrEmpty(tempLocation),
+ "BigQueryIO.Write needs a GCS temp location to store temp files.");
+ if (bigQueryServices == null) {
+ try {
+ GcsPath.fromUri(tempLocation);
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
+ tempLocation),
+ e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public PDone apply(PCollection<TableRow> input) {
+ BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ // In a streaming job, or when a tablespec function is defined, we use StreamWithDeDup
+ // and BigQuery's streaming import API.
+ if (options.isStreaming() || tableRefFunction != null) {
+ return input.apply(new StreamWithDeDup(getTable(), tableRefFunction, getSchema()));
+ }
+
+ TableReference table = fromJsonString(jsonTableRef, TableReference.class);
+ if (Strings.isNullOrEmpty(table.getProjectId())) {
+ table.setProjectId(options.getProject());
+ }
+ String jobIdToken = randomUUIDString();
+ String tempLocation = options.getTempLocation();
+ String tempFilePrefix;
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ tempFilePrefix = factory.resolve(
+ factory.resolve(tempLocation, "BigQuerySinkTemp"),
+ jobIdToken);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve BigQuery temp location in %s", tempLocation),
+ e);
+ }
+
+ BigQueryServices bqServices = getBigQueryServices();
+ return input.apply("Write", org.apache.beam.sdk.io.Write.to(
+ new BigQuerySink(
+ jobIdToken,
+ table,
+ jsonSchema,
+ getWriteDisposition(),
+ getCreateDisposition(),
+ tempFilePrefix,
+ input.getCoder(),
+ bqServices)));
+ }
+
+ @Override
+ protected Coder<Void> getDefaultOutputCoder() {
+ return VoidCoder.of();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull(DisplayData.item("table", jsonTableRef)
+ .withLabel("Table Reference"))
+ .addIfNotNull(DisplayData.item("schema", jsonSchema)
+ .withLabel("Table Schema"));
+
+ if (tableRefFunction != null) {
+ builder.add(DisplayData.item("tableFn", tableRefFunction.getClass())
+ .withLabel("Table Reference Function"));
+ }
+
+ builder
+ .add(DisplayData.item("createDisposition", createDisposition.toString())
+ .withLabel("Table CreateDisposition"))
+ .add(DisplayData.item("writeDisposition", writeDisposition.toString())
+ .withLabel("Table WriteDisposition"))
+ .addIfNotDefault(DisplayData.item("validation", validate)
+ .withLabel("Validation Enabled"), true);
+ }
+
+ /** Returns the create disposition. */
+ public CreateDisposition getCreateDisposition() {
+ return createDisposition;
+ }
+
+ /** Returns the write disposition. */
+ public WriteDisposition getWriteDisposition() {
+ return writeDisposition;
+ }
+
+ /** Returns the table schema. */
+ public TableSchema getSchema() {
+ return fromJsonString(jsonSchema, TableSchema.class);
+ }
+
+ /**
+ * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}.
+ *
+ * <p>If the table's project is not specified, use the executing project.
+ */
+ @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
+ TableReference table = getTable();
+ if (table != null && Strings.isNullOrEmpty(table.getProjectId())) {
+ // If user does not specify a project we assume the table to be located in
+ // the default project.
+ table.setProjectId(bqOptions.getProject());
+ }
+ return table;
+ }
+
+ /** Returns the table reference, or {@code null}. */
+ @Nullable
+ public TableReference getTable() {
+ return fromJsonString(jsonTableRef, TableReference.class);
+ }
+
+ /** Returns {@code true} if table validation is enabled. */
+ public boolean getValidate() {
+ return validate;
+ }
+
+ private BigQueryServices getBigQueryServices() {
+ if (bigQueryServices == null) {
+ bigQueryServices = new BigQueryServicesImpl();
+ }
+ return bigQueryServices;
+ }
+ }
+
+ /** Disallow construction of utility class. */
+ private Write() {}
+ }
+
+ /**
+ * {@link BigQuerySink} is implemented as a {@link FileBasedSink}.
+ *
+ * <p>It uses BigQuery load job to import files into BigQuery.
+ */
+ static class BigQuerySink extends FileBasedSink<TableRow> {
+ private final String jobIdToken;
+ @Nullable private final String jsonTable;
+ @Nullable private final String jsonSchema;
+ private final WriteDisposition writeDisposition;
+ private final CreateDisposition createDisposition;
+ private final Coder<TableRow> coder;
+ private final BigQueryServices bqServices;
+
+ public BigQuerySink(
+ String jobIdToken,
+ @Nullable TableReference table,
+ @Nullable String jsonSchema,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition,
+ String tempFile,
+ Coder<TableRow> coder,
+ BigQueryServices bqServices) {
+ super(tempFile, ".json");
+ this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+ if (table == null) {
+ this.jsonTable = null;
+ } else {
+ checkArgument(!Strings.isNullOrEmpty(table.getProjectId()),
+ "Table %s should have a project specified", table);
+ this.jsonTable = toJsonString(table);
+ }
+ this.jsonSchema = jsonSchema;
+ this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition");
+ this.createDisposition = checkNotNull(createDisposition, "createDisposition");
+ this.coder = checkNotNull(coder, "coder");
+ this.bqServices = checkNotNull(bqServices, "bqServices");
+ }
+
+ @Override
+ public FileBasedSink.FileBasedWriteOperation<TableRow> createWriteOperation(
+ PipelineOptions options) {
+ return new BigQueryWriteOperation(this);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder
+ .addIfNotNull(DisplayData.item("schema", jsonSchema)
+ .withLabel("Table Schema"))
+ .addIfNotNull(DisplayData.item("tableSpec", jsonTable)
+ .withLabel("Table Specification"));
+ }
+
+ private static class BigQueryWriteOperation extends FileBasedWriteOperation<TableRow> {
+ // The maximum number of retry load jobs.
+ private static final int MAX_RETRY_LOAD_JOBS = 3;
+
+ // The maximum number of retries to poll the status of a load job.
+ // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
+ private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+ private final BigQuerySink bigQuerySink;
+
+ private BigQueryWriteOperation(BigQuerySink sink) {
+ super(checkNotNull(sink, "sink"));
+ this.bigQuerySink = sink;
+ }
+
+ @Override
+ public FileBasedWriter<TableRow> createWriter(PipelineOptions options) throws Exception {
+ return new TableRowWriter(this, bigQuerySink.coder);
+ }
+
+ @Override
+ public void finalize(Iterable<FileResult> writerResults, PipelineOptions options)
+ throws IOException, InterruptedException {
+ try {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ List<String> tempFiles = Lists.newArrayList();
+ for (FileResult result : writerResults) {
+ tempFiles.add(result.getFilename());
+ }
+ if (!tempFiles.isEmpty()) {
+ load(
+ bigQuerySink.bqServices.getJobService(bqOptions),
+ bigQuerySink.jobIdToken,
+ fromJsonString(bigQuerySink.jsonTable, TableReference.class),
+ tempFiles,
+ fromJsonString(bigQuerySink.jsonSchema, TableSchema.class),
+ bigQuerySink.writeDisposition,
+ bigQuerySink.createDisposition);
+ }
+ } finally {
+ removeTemporaryFiles(options);
+ }
+ }
+
+ /**
+ * Import files into BigQuery with load jobs.
+ *
+ * <p>Returns if files are successfully loaded into BigQuery.
+ * Throws a RuntimeException if:
+ * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data.
+ * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}.
+ *
+ * <p>If a load job failed, it will try another load job with a different job id.
+ */
+ private void load(
+ JobService jobService,
+ String jobIdPrefix,
+ TableReference ref,
+ List<String> gcsUris,
+ @Nullable TableSchema schema,
+ WriteDisposition writeDisposition,
+ CreateDisposition createDisposition) throws InterruptedException, IOException {
+ JobConfigurationLoad loadConfig = new JobConfigurationLoad()
+ .setSourceUris(gcsUris)
+ .setDestinationTable(ref)
+ .setSchema(schema)
+ .setWriteDisposition(writeDisposition.name())
+ .setCreateDisposition(createDisposition.name())
+ .setSourceFormat("NEWLINE_DELIMITED_JSON");
+
+ boolean retrying = false;
+ String projectId = ref.getProjectId();
+ for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) {
+ String jobId = jobIdPrefix + "-" + i;
+ if (retrying) {
+ LOG.info("Previous load jobs failed, retrying.");
+ }
+ LOG.info("Starting BigQuery load job: {}", jobId);
+ JobReference jobRef = new JobReference()
+ .setProjectId(projectId)
+ .setJobId(jobId);
+ jobService.startLoadJob(jobRef, loadConfig);
+ Status jobStatus =
+ parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES));
+ switch (jobStatus) {
+ case SUCCEEDED:
+ return;
+ case UNKNOWN:
+ throw new RuntimeException("Failed to poll the load job status.");
+ case FAILED:
+ LOG.info("BigQuery load job failed: {}", jobId);
+ retrying = true;
+ continue;
+ default:
+ throw new IllegalStateException("Unexpected job status: " + jobStatus);
+ }
+ }
+ throw new RuntimeException(
+ "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS);
+ }
+ }
+
+ private static class TableRowWriter extends FileBasedWriter<TableRow> {
+ private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
+ private final Coder<TableRow> coder;
+ private OutputStream out;
+
+ public TableRowWriter(
+ FileBasedWriteOperation<TableRow> writeOperation, Coder<TableRow> coder) {
+ super(writeOperation);
+ this.mimeType = MimeTypes.TEXT;
+ this.coder = coder;
+ }
+
+ @Override
+ protected void prepareWrite(WritableByteChannel channel) throws Exception {
+ out = Channels.newOutputStream(channel);
+ }
+
+ @Override
+ public void write(TableRow value) throws Exception {
+ // Use Context.OUTER to encode and NEWLINE as the delimeter.
+ coder.encode(value, out, Context.OUTER);
+ out.write(NEWLINE);
+ }
+ }
+ }
+
+ private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
+ try {
+ datasetService.getDataset(table.getProjectId(), table.getDatasetId());
+ } catch (Exception e) {
+ ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+ if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
+ throw new IllegalArgumentException(
+ String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)),
+ e);
+ } else {
+ throw new RuntimeException(
+ String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset",
+ BigQueryIO.toTableSpec(table)),
+ e);
+ }
+ }
+ }
+
+ private static void verifyTablePresence(DatasetService datasetService, TableReference table) {
+ try {
+ datasetService.getTable(table.getProjectId(), table.getDatasetId(), table.getTableId());
+ } catch (Exception e) {
+ ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
+ if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
+ throw new IllegalArgumentException(
+ String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e);
+ } else {
+ throw new RuntimeException(
+ String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
+ BigQueryIO.toTableSpec(table)),
+ e);
+ }
+ }
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Implementation of DoFn to perform streaming BigQuery write.
+ */
+ @SystemDoFnInternal
+ private static class StreamingWriteFn
+ extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
+ /** TableSchema in JSON. Use String to make the class Serializable. */
+ private final String jsonTableSchema;
+
+ /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */
+ private transient Map<String, List<TableRow>> tableRows;
+
+ /** The list of unique ids for each BigQuery table row. */
+ private transient Map<String, List<String>> uniqueIdsForTableRows;
+
+ /** The list of tables created so far, so we don't try the creation
+ each time. */
+ private static Set<String> createdTables =
+ Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+
+ /** Tracks bytes written, exposed as "ByteCount" Counter. */
+ private Aggregator<Long, Long> byteCountAggregator =
+ createAggregator("ByteCount", new Sum.SumLongFn());
+
+ /** Constructor. */
+ StreamingWriteFn(TableSchema schema) {
+ jsonTableSchema = toJsonString(schema);
+ }
+
+ /** Prepares a target BigQuery table. */
+ @Override
+ public void startBundle(Context context) {
+ tableRows = new HashMap<>();
+ uniqueIdsForTableRows = new HashMap<>();
+ }
+
+ /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */
+ @Override
+ public void processElement(ProcessContext context) {
+ String tableSpec = context.element().getKey().getKey();
+ List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
+ List<String> uniqueIds = getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec);
+
+ rows.add(context.element().getValue().tableRow);
+ uniqueIds.add(context.element().getValue().uniqueId);
+ }
+
+ /** Writes the accumulated rows into BigQuery with streaming API. */
+ @Override
+ public void finishBundle(Context context) throws Exception {
+ BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
+ Bigquery client = Transport.newBigQueryClient(options).build();
+
+ for (Map.Entry<String, List<TableRow>> entry : tableRows.entrySet()) {
+ TableReference tableReference = getOrCreateTable(options, entry.getKey());
+ flushRows(client, tableReference, entry.getValue(),
+ uniqueIdsForTableRows.get(entry.getKey()), options);
+ }
+ tableRows.clear();
+ uniqueIdsForTableRows.clear();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+
+ builder.addIfNotNull(DisplayData.item("schema", jsonTableSchema)
+ .withLabel("Table Schema"));
+ }
+
+ public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
+ throws IOException {
+ TableReference tableReference = parseTableSpec(tableSpec);
+ if (!createdTables.contains(tableSpec)) {
+ synchronized (createdTables) {
+ // Another thread may have succeeded in creating the table in the meanwhile, so
+ // check again. This check isn't needed for correctness, but we add it to prevent
+ // every thread from attempting a create and overwhelming our BigQuery quota.
+ if (!createdTables.contains(tableSpec)) {
+ TableSchema tableSchema = JSON_FACTORY.fromString(jsonTableSchema, TableSchema.class);
+ Bigquery client = Transport.newBigQueryClient(options).build();
+ BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
+ inserter.getOrCreateTable(tableReference, WriteDisposition.WRITE_APPEND,
+ CreateDisposition.CREATE_IF_NEEDED, tableSchema);
+ createdTables.add(tableSpec);
+ }
+ }
+ }
+ return tableReference;
+ }
+
+ /** Writes the accumulated rows into BigQuery with streaming API. */
+ private void flushRows(Bigquery client, TableReference tableReference,
+ List<TableRow> tableRows, List<String> uniqueIds, BigQueryOptions options) {
+ if (!tableRows.isEmpty()) {
+ try {
+ BigQueryTableInserter inserter = new BigQueryTableInserter(client, options);
+ inserter.insertAll(tableReference, tableRows, uniqueIds, byteCountAggregator);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ private static class ShardedKey<K> {
+ private final K key;
+ private final int shardNumber;
+
+ public static <K> ShardedKey<K> of(K key, int shardNumber) {
+ return new ShardedKey<K>(key, shardNumber);
+ }
+
+ private ShardedKey(K key, int shardNumber) {
+ this.key = key;
+ this.shardNumber = shardNumber;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public int getShardNumber() {
+ return shardNumber;
+ }
+ }
+
+ /**
+ * A {@link Coder} for {@link ShardedKey}, using a wrapped key {@link Coder}.
+ */
+ private static class ShardedKeyCoder<KeyT>
+ extends StandardCoder<ShardedKey<KeyT>> {
+ public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
+ return new ShardedKeyCoder<>(keyCoder);
+ }
+
+ @JsonCreator
+ public static <KeyT> ShardedKeyCoder<KeyT> of(
+ @JsonProperty(PropertyNames.COMPONENT_ENCODINGS)
+ List<Coder<KeyT>> components) {
+ checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
+ return of(components.get(0));
+ }
+
+ protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
+ this.keyCoder = keyCoder;
+ this.shardNumberCoder = VarIntCoder.of();
+ }
+
+ @Override
+ public List<? extends Coder<?>> getCoderArguments() {
+ return Arrays.asList(keyCoder);
+ }
+
+ @Override
+ public void encode(ShardedKey<KeyT> key, OutputStream outStream, Context context)
+ throws IOException {
+ keyCoder.encode(key.getKey(), outStream, context.nested());
+ shardNumberCoder.encode(key.getShardNumber(), outStream, context);
+ }
+
+ @Override
+ public ShardedKey<KeyT> decode(InputStream inStream, Context context)
+ throws IOException {
+ return new ShardedKey<KeyT>(
+ keyCoder.decode(inStream, context.nested()),
+ shardNumberCoder.decode(inStream, context));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ keyCoder.verifyDeterministic();
+ }
+
+ Coder<KeyT> keyCoder;
+ VarIntCoder shardNumberCoder;
+ }
+
+ private static class TableRowInfoCoder extends AtomicCoder<TableRowInfo> {
+ private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
+
+ @JsonCreator
+ public static TableRowInfoCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(TableRowInfo value, OutputStream outStream, Context context)
+ throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null value");
+ }
+ tableRowCoder.encode(value.tableRow, outStream, context.nested());
+ idCoder.encode(value.uniqueId, outStream, context.nested());
+ }
+
+ @Override
+ public TableRowInfo decode(InputStream inStream, Context context)
+ throws IOException {
+ return new TableRowInfo(
+ tableRowCoder.decode(inStream, context.nested()),
+ idCoder.decode(inStream, context.nested()));
+ }
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ throw new NonDeterministicException(this, "TableRows are not deterministic.");
+ }
+
+ TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of()
<TRUNCATED>