You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/03/28 15:51:47 UTC
[5/6] beam git commit: Refactor BigQueryIO helper methods into a
BigQueryHelpers class. Move helper transforms for BigQueryIO.Read into
individual files. Change private to package in BigQueryHelpers.
Refactor BigQueryIO helper methods into a BigQueryHelpers class.
Move helper transforms for BigQueryIO.Read into individual files.
Change private to package in BigQueryHelpers.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2cc2e81f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2cc2e81f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2cc2e81f
Branch: refs/heads/master
Commit: 2cc2e81f241e1d5d590f12d4a8491273908e2cce
Parents: 434eadb
Author: Reuven Lax <re...@google.com>
Authored: Fri Mar 17 15:15:16 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Mar 28 08:46:15 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/bigquery/BigQueryHelpers.java | 243 ++++++
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 853 ++-----------------
.../io/gcp/bigquery/BigQueryQuerySource.java | 186 ++++
.../io/gcp/bigquery/BigQueryServicesImpl.java | 2 +-
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 178 ++++
.../io/gcp/bigquery/BigQueryTableSource.java | 86 ++
.../io/gcp/bigquery/PassThroughThenCleanup.java | 66 ++
.../sdk/io/gcp/bigquery/TransformingSource.java | 118 +++
.../sdk/io/gcp/bigquery/BigQueryIOTest.java | 54 +-
.../sdk/io/gcp/bigquery/BigQueryUtilTest.java | 12 +-
10 files changed, 970 insertions(+), 828 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
new file mode 100644
index 0000000..9fba938
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -0,0 +1,243 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+
+
+/**
+ * A set of helper functions and classes used by {@link BigQueryIO}.
+ */
+public class BigQueryHelpers {
+ @Nullable
+ /**
+ * Return a displayable string representation for a {@link TableReference}.
+ */
+ static ValueProvider<String> displayTable(
+ @Nullable ValueProvider<TableReference> table) {
+ if (table == null) {
+ return null;
+ }
+ return NestedValueProvider.of(table, new TableRefToTableSpec());
+ }
+
+ /**
+ * Returns a canonical string representation of the {@link TableReference}.
+ */
+ static String toTableSpec(TableReference ref) {
+ StringBuilder sb = new StringBuilder();
+ if (ref.getProjectId() != null) {
+ sb.append(ref.getProjectId());
+ sb.append(":");
+ }
+
+ sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
+ return sb.toString();
+ }
+
+ static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
+ List<V> value = map.get(key);
+ if (value == null) {
+ value = new ArrayList<>();
+ map.put(key, value);
+ }
+ return value;
+ }
+
+ /**
+ * Parse a table specification in the form
+ * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
+ *
+ * <p>If the project id is omitted, the default project id is used.
+ */
+ static TableReference parseTableSpec(String tableSpec) {
+ Matcher match = BigQueryIO.TABLE_SPEC.matcher(tableSpec);
+ if (!match.matches()) {
+ throw new IllegalArgumentException(
+ "Table reference is not in [project_id]:[dataset_id].[table_id] "
+ + "format: " + tableSpec);
+ }
+
+ TableReference ref = new TableReference();
+ ref.setProjectId(match.group("PROJECT"));
+
+ return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
+ }
+
+ static String jobToPrettyString(@Nullable Job job) throws IOException {
+ return job == null ? "null" : job.toPrettyString();
+ }
+
+ static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
+ return status == null ? "Unknown status: null." : status.toPrettyString();
+ }
+
+ static Status parseStatus(@Nullable Job job) {
+ if (job == null) {
+ return Status.UNKNOWN;
+ }
+ JobStatus status = job.getStatus();
+ if (status.getErrorResult() != null) {
+ return Status.FAILED;
+ } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
+ return Status.FAILED;
+ } else {
+ return Status.SUCCEEDED;
+ }
+ }
+
+ @VisibleForTesting
+ static String toJsonString(Object item) {
+ if (item == null) {
+ return null;
+ }
+ try {
+ return BigQueryIO.JSON_FACTORY.toString(item);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()),
+ e);
+ }
+ }
+
+ @VisibleForTesting
+ static <T> T fromJsonString(String json, Class<T> clazz) {
+ if (json == null) {
+ return null;
+ }
+ try {
+ return BigQueryIO.JSON_FACTORY.fromString(json, clazz);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json),
+ e);
+ }
+ }
+
+ /**
+ * Returns a randomUUID string.
+ *
+ * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
+ */
+ static String randomUUIDString() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
+ @VisibleForTesting
+ static class JsonSchemaToTableSchema
+ implements SerializableFunction<String, TableSchema> {
+ @Override
+ public TableSchema apply(String from) {
+ return fromJsonString(from, TableSchema.class);
+ }
+ }
+
+ @VisibleForTesting
+ static class BeamJobUuidToBigQueryJobUuid
+ implements SerializableFunction<String, String> {
+ @Override
+ public String apply(String from) {
+ return "beam_job_" + from;
+ }
+ }
+
+ static class TableSchemaToJsonSchema
+ implements SerializableFunction<TableSchema, String> {
+ @Override
+ public String apply(TableSchema from) {
+ return toJsonString(from);
+ }
+ }
+
+ static class JsonTableRefToTableRef
+ implements SerializableFunction<String, TableReference> {
+ @Override
+ public TableReference apply(String from) {
+ return fromJsonString(from, TableReference.class);
+ }
+ }
+
+ static class TableRefToTableSpec
+ implements SerializableFunction<TableReference, String> {
+ @Override
+ public String apply(TableReference from) {
+ return toTableSpec(from);
+ }
+ }
+
+ static class TableRefToJson
+ implements SerializableFunction<TableReference, String> {
+ @Override
+ public String apply(TableReference from) {
+ return toJsonString(from);
+ }
+ }
+
+ static class TableRefToProjectId
+ implements SerializableFunction<TableReference, String> {
+ @Override
+ public String apply(TableReference from) {
+ return from.getProjectId();
+ }
+ }
+
+ @VisibleForTesting
+ static class TableSpecToTableRef
+ implements SerializableFunction<String, TableReference> {
+ @Override
+ public TableReference apply(String from) {
+ return parseTableSpec(from);
+ }
+ }
+
+ @VisibleForTesting
+ static class CreatePerBeamJobUuid
+ implements SerializableFunction<String, String> {
+ private final String stepUuid;
+
+ CreatePerBeamJobUuid(String stepUuid) {
+ this.stepUuid = stepUuid;
+ }
+
+ @Override
+ public String apply(String jobUuid) {
+ return stepUuid + "_" + jobUuid.replaceAll("-", "");
+ }
+ }
+
+ @VisibleForTesting
+ static class CreateJsonTableRefFromUuid
+ implements SerializableFunction<String, TableReference> {
+ private final String executingProject;
+
+ public CreateJsonTableRefFromUuid(String executingProject) {
+ this.executingProject = executingProject;
+ }
+
+ @Override
+ public TableReference apply(String jobUuid) {
+ String queryTempDatasetId = "temp_dataset_" + jobUuid;
+ String queryTempTableId = "temp_table_" + jobUuid;
+ TableReference queryTempTableRef = new TableReference()
+ .setProjectId(executingProject)
+ .setDatasetId(queryTempDatasetId)
+ .setTableId(queryTempTableId);
+ return queryTempTableRef;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index d195afd..3f2f3e8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -25,13 +25,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.Job;
-import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
-import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
@@ -39,38 +37,31 @@ import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
import java.io.OutputStream;
-import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
-import org.apache.avro.generic.GenericRecord;
+
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
@@ -82,8 +73,16 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.TableRowJsonCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.BeamJobUuidToBigQueryJobUuid;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreateJsonTableRefFromUuid;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.CreatePerBeamJobUuid;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToTableSpec;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.metrics.Counter;
@@ -131,7 +130,6 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.ValueInSingleWindow;
-import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -153,7 +151,7 @@ import org.slf4j.LoggerFactory;
* from the <a href="https://cloud.google.com/bigquery/client-libraries">
* BigQuery Java Client API</a>.
* Tables can be referred to as Strings, with or without the {@code projectId}.
- * A helper function is provided ({@link BigQueryIO#parseTableSpec(String)})
+ * A helper function is provided ({@link BigQueryHelpers#parseTableSpec(String)})
* that parses the following string forms into a {@link TableReference}:
*
* <ul>
@@ -253,7 +251,7 @@ public class BigQueryIO {
* Singleton instance of the JSON factory used to read and write JSON
* formatted rows.
*/
- private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
+ static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
/**
* Project IDs must contain 6-63 lowercase letters, digits, or dashes.
@@ -281,7 +279,7 @@ public class BigQueryIO {
String.format("((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)", PROJECT_ID_REGEXP,
DATASET_REGEXP, TABLE_REGEXP);
- private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
+ static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
private static final String RESOURCE_NOT_FOUND_ERROR =
"BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
@@ -293,152 +291,6 @@ public class BigQueryIO {
+ " an earlier stage of the pipeline, this validation can be disabled using"
+ " #withoutValidation.";
- /**
- * Parse a table specification in the form
- * {@code "[project_id]:[dataset_id].[table_id]"} or {@code "[dataset_id].[table_id]"}.
- *
- * <p>If the project id is omitted, the default project id is used.
- */
- public static TableReference parseTableSpec(String tableSpec) {
- Matcher match = TABLE_SPEC.matcher(tableSpec);
- if (!match.matches()) {
- throw new IllegalArgumentException(
- "Table reference is not in [project_id]:[dataset_id].[table_id] "
- + "format: " + tableSpec);
- }
-
- TableReference ref = new TableReference();
- ref.setProjectId(match.group("PROJECT"));
-
- return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
- }
-
- /**
- * Returns a canonical string representation of the {@link TableReference}.
- */
- public static String toTableSpec(TableReference ref) {
- StringBuilder sb = new StringBuilder();
- if (ref.getProjectId() != null) {
- sb.append(ref.getProjectId());
- sb.append(":");
- }
-
- sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
- return sb.toString();
- }
-
- @VisibleForTesting
- static class JsonSchemaToTableSchema
- implements SerializableFunction<String, TableSchema> {
- @Override
- public TableSchema apply(String from) {
- return fromJsonString(from, TableSchema.class);
- }
- }
-
- private static class TableSchemaToJsonSchema
- implements SerializableFunction<TableSchema, String> {
- @Override
- public String apply(TableSchema from) {
- return toJsonString(from);
- }
- }
-
- private static class JsonTableRefToTableRef
- implements SerializableFunction<String, TableReference> {
- @Override
- public TableReference apply(String from) {
- return fromJsonString(from, TableReference.class);
- }
- }
-
- private static class TableRefToTableSpec
- implements SerializableFunction<TableReference, String> {
- @Override
- public String apply(TableReference from) {
- return toTableSpec(from);
- }
- }
-
- private static class TableRefToJson
- implements SerializableFunction<TableReference, String> {
- @Override
- public String apply(TableReference from) {
- return toJsonString(from);
- }
- }
-
- private static class TableRefToProjectId
- implements SerializableFunction<TableReference, String> {
- @Override
- public String apply(TableReference from) {
- return from.getProjectId();
- }
- }
-
- @VisibleForTesting
- static class TableSpecToTableRef
- implements SerializableFunction<String, TableReference> {
- @Override
- public TableReference apply(String from) {
- return parseTableSpec(from);
- }
- }
-
- @VisibleForTesting
- static class BeamJobUuidToBigQueryJobUuid
- implements SerializableFunction<String, String> {
- @Override
- public String apply(String from) {
- return "beam_job_" + from;
- }
- }
-
- @VisibleForTesting
- static class CreatePerBeamJobUuid
- implements SerializableFunction<String, String> {
- private final String stepUuid;
-
- private CreatePerBeamJobUuid(String stepUuid) {
- this.stepUuid = stepUuid;
- }
-
- @Override
- public String apply(String jobUuid) {
- return stepUuid + "_" + jobUuid.replaceAll("-", "");
- }
- }
-
- @VisibleForTesting
- static class CreateJsonTableRefFromUuid
- implements SerializableFunction<String, TableReference> {
- private final String executingProject;
-
- private CreateJsonTableRefFromUuid(String executingProject) {
- this.executingProject = executingProject;
- }
-
- @Override
- public TableReference apply(String jobUuid) {
- String queryTempDatasetId = "temp_dataset_" + jobUuid;
- String queryTempTableId = "temp_table_" + jobUuid;
- TableReference queryTempTableRef = new TableReference()
- .setProjectId(executingProject)
- .setDatasetId(queryTempDatasetId)
- .setTableId(queryTempTableId);
- return queryTempTableRef;
- }
- }
-
- @Nullable
- private static ValueProvider<String> displayTable(
- @Nullable ValueProvider<TableReference> table) {
- if (table == null) {
- return null;
- }
- return NestedValueProvider.of(table, new TableRefToTableSpec());
- }
-
/**
* A formatting function that maps a TableRow to itself. This allows sending a
@@ -556,7 +408,7 @@ public class BigQueryIO {
* Read from table specified by a {@link TableReference}.
*/
public Read from(TableReference table) {
- return from(StaticValueProvider.of(toTableSpec(table)));
+ return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
}
private static final String QUERY_VALIDATION_FAILURE_ERROR =
@@ -672,7 +524,7 @@ public class BigQueryIO {
@Override
public PCollection<TableRow> expand(PBegin input) {
- String stepUuid = randomUUIDString();
+ String stepUuid = BigQueryHelpers.randomUUIDString();
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
ValueProvider<String> jobUuid = NestedValueProvider.of(
StaticValueProvider.of(bqOptions.getJobName()), new CreatePerBeamJobUuid(stepUuid));
@@ -752,7 +604,7 @@ public class BigQueryIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .addIfNotNull(DisplayData.item("table", displayTable(getTableProvider()))
+ .addIfNotNull(DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider()))
.withLabel("Table"))
.addIfNotNull(DisplayData.item("query", getQuery())
.withLabel("Query"))
@@ -787,7 +639,7 @@ public class BigQueryIO {
TableReference tableRef = table.get();
tableRef.setProjectId(bqOptions.getProject());
return NestedValueProvider.of(StaticValueProvider.of(
- toJsonString(tableRef)), new JsonTableRefToTableRef());
+ BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
}
return table;
}
@@ -810,541 +662,15 @@ public class BigQueryIO {
}
}
- /**
- * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
- * has been processed.
- */
- @VisibleForTesting
- static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
-
- private CleanupOperation cleanupOperation;
-
- PassThroughThenCleanup(CleanupOperation cleanupOperation) {
- this.cleanupOperation = cleanupOperation;
- }
-
- @Override
- public PCollection<T> expand(PCollection<T> input) {
- TupleTag<T> mainOutput = new TupleTag<>();
- TupleTag<Void> cleanupSignal = new TupleTag<>();
- PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
- .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
-
- PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
- .setCoder(VoidCoder.of())
- .apply(View.<Void>asSingleton().withDefaultValue(null));
-
- input.getPipeline()
- .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
- .apply("Cleanup", ParDo.of(
- new DoFn<CleanupOperation, Void>() {
- @ProcessElement
- public void processElement(ProcessContext c)
- throws Exception {
- c.element().cleanup(c.getPipelineOptions());
- }
- }).withSideInputs(cleanupSignalView));
-
- return outputs.get(mainOutput);
- }
-
- private static class IdentityFn<T> extends DoFn<T, T> {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }
-
- abstract static class CleanupOperation implements Serializable {
- abstract void cleanup(PipelineOptions options) throws Exception;
- }
- }
-
- /**
- * A {@link BigQuerySourceBase} for reading BigQuery tables.
- */
- @VisibleForTesting
- static class BigQueryTableSource extends BigQuerySourceBase {
-
- static BigQueryTableSource create(
- ValueProvider<String> jobIdToken,
- ValueProvider<TableReference> table,
- String extractDestinationDir,
- BigQueryServices bqServices,
- ValueProvider<String> executingProject) {
- return new BigQueryTableSource(
- jobIdToken, table, extractDestinationDir, bqServices, executingProject);
- }
-
- private final ValueProvider<String> jsonTable;
- private final AtomicReference<Long> tableSizeBytes;
-
- private BigQueryTableSource(
- ValueProvider<String> jobIdToken,
- ValueProvider<TableReference> table,
- String extractDestinationDir,
- BigQueryServices bqServices,
- ValueProvider<String> executingProject) {
- super(jobIdToken, extractDestinationDir, bqServices, executingProject);
- this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
- this.tableSizeBytes = new AtomicReference<>();
- }
-
- @Override
- protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
- checkState(jsonTable.isAccessible());
- return JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
- }
-
- @Override
- public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- checkState(jsonTable.isAccessible());
- TableReference tableRef = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
- return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
- }
-
- @Override
- public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- if (tableSizeBytes.get() == null) {
- TableReference table = JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
-
- Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
- .getTable(table).getNumBytes();
- tableSizeBytes.compareAndSet(null, numBytes);
- }
- return tableSizeBytes.get();
- }
-
- @Override
- protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
- // Do nothing.
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("table", jsonTable));
- }
- }
-
- /**
- * A {@link BigQuerySourceBase} for querying BigQuery tables.
- */
- @VisibleForTesting
- static class BigQueryQuerySource extends BigQuerySourceBase {
-
- static BigQueryQuerySource create(
- ValueProvider<String> jobIdToken,
- ValueProvider<String> query,
- ValueProvider<TableReference> queryTempTableRef,
- Boolean flattenResults,
- Boolean useLegacySql,
- String extractDestinationDir,
- BigQueryServices bqServices) {
- return new BigQueryQuerySource(
- jobIdToken,
- query,
- queryTempTableRef,
- flattenResults,
- useLegacySql,
- extractDestinationDir,
- bqServices);
- }
-
- private final ValueProvider<String> query;
- private final ValueProvider<String> jsonQueryTempTable;
- private final Boolean flattenResults;
- private final Boolean useLegacySql;
- private transient AtomicReference<JobStatistics> dryRunJobStats;
-
- private BigQueryQuerySource(
- ValueProvider<String> jobIdToken,
- ValueProvider<String> query,
- ValueProvider<TableReference> queryTempTableRef,
- Boolean flattenResults,
- Boolean useLegacySql,
- String extractDestinationDir,
- BigQueryServices bqServices) {
- super(jobIdToken, extractDestinationDir, bqServices,
- NestedValueProvider.of(
- checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId()));
- this.query = checkNotNull(query, "query");
- this.jsonQueryTempTable = NestedValueProvider.of(
- queryTempTableRef, new TableRefToJson());
- this.flattenResults = checkNotNull(flattenResults, "flattenResults");
- this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
- this.dryRunJobStats = new AtomicReference<>();
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
- }
-
- @Override
- public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- return new BigQueryReader(this, bqServices.getReaderFromQuery(
- bqOptions, executingProject.get(), createBasicQueryConfig()));
- }
-
- @Override
- protected TableReference getTableToExtract(BigQueryOptions bqOptions)
- throws IOException, InterruptedException {
- // 1. Find the location of the query.
- String location = null;
- List<TableReference> referencedTables =
- dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
- DatasetService tableService = bqServices.getDatasetService(bqOptions);
- if (referencedTables != null && !referencedTables.isEmpty()) {
- TableReference queryTable = referencedTables.get(0);
- location = tableService.getTable(queryTable).getLocation();
- }
-
- // 2. Create the temporary dataset in the query location.
- TableReference tableToExtract =
- JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
- tableService.createDataset(
- tableToExtract.getProjectId(),
- tableToExtract.getDatasetId(),
- location,
- "Dataset for BigQuery query job temporary table");
-
- // 3. Execute the query.
- String queryJobId = jobIdToken.get() + "-query";
- executeQuery(
- executingProject.get(),
- queryJobId,
- tableToExtract,
- bqServices.getJobService(bqOptions));
- return tableToExtract;
- }
-
- @Override
- protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
- checkState(jsonQueryTempTable.isAccessible());
- TableReference tableToRemove =
- JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
-
- DatasetService tableService = bqServices.getDatasetService(bqOptions);
- tableService.deleteTable(tableToRemove);
- tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder.add(DisplayData.item("query", query));
- }
-
- private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
- throws InterruptedException, IOException {
- if (dryRunJobStats.get() == null) {
- JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
- executingProject.get(), createBasicQueryConfig());
- dryRunJobStats.compareAndSet(null, jobStats);
- }
- return dryRunJobStats.get();
- }
-
- private void executeQuery(
- String executingProject,
- String jobId,
- TableReference destinationTable,
- JobService jobService) throws IOException, InterruptedException {
- JobReference jobRef = new JobReference()
- .setProjectId(executingProject)
- .setJobId(jobId);
-
- JobConfigurationQuery queryConfig = createBasicQueryConfig()
- .setAllowLargeResults(true)
- .setCreateDisposition("CREATE_IF_NEEDED")
- .setDestinationTable(destinationTable)
- .setPriority("BATCH")
- .setWriteDisposition("WRITE_EMPTY");
-
- jobService.startQueryJob(jobRef, queryConfig);
- Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
- if (parseStatus(job) != Status.SUCCEEDED) {
- throw new IOException(String.format(
- "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus())));
- }
- }
-
- private JobConfigurationQuery createBasicQueryConfig() {
- return new JobConfigurationQuery()
- .setFlattenResults(flattenResults)
- .setQuery(query.get())
- .setUseLegacySql(useLegacySql);
- }
-
- private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
- in.defaultReadObject();
- dryRunJobStats = new AtomicReference<>();
- }
- }
-
- /**
- * An abstract {@link BoundedSource} to read a table from BigQuery.
- *
- * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
- * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
- * and {@link BigQueryQuerySource}, depending on the configuration of the read.
- * Specifically,
- * <ul>
- * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
- * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
- * </ul>
- * ...
- */
- private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
- // The maximum number of retries to poll a BigQuery job.
- protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
-
- protected final ValueProvider<String> jobIdToken;
- protected final String extractDestinationDir;
- protected final BigQueryServices bqServices;
- protected final ValueProvider<String> executingProject;
-
- private BigQuerySourceBase(
- ValueProvider<String> jobIdToken,
- String extractDestinationDir,
- BigQueryServices bqServices,
- ValueProvider<String> executingProject) {
- this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
- this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
- this.bqServices = checkNotNull(bqServices, "bqServices");
- this.executingProject = checkNotNull(executingProject, "executingProject");
- }
-
- @Override
- public List<BoundedSource<TableRow>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- TableReference tableToExtract = getTableToExtract(bqOptions);
- JobService jobService = bqServices.getJobService(bqOptions);
- String extractJobId = getExtractJobId(jobIdToken);
- List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
-
- TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
- .getTable(tableToExtract).getSchema();
-
- cleanupTempResource(bqOptions);
- return createSources(tempFiles, tableSchema);
- }
-
- protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
-
- protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
-
- @Override
- public void validate() {
- // Do nothing, validation is done in BigQuery.Read.
- }
-
- @Override
- public Coder<TableRow> getDefaultOutputCoder() {
- return TableRowJsonCoder.of();
- }
-
- private List<String> executeExtract(
- String jobId, TableReference table, JobService jobService)
- throws InterruptedException, IOException {
- JobReference jobRef = new JobReference()
- .setProjectId(executingProject.get())
- .setJobId(jobId);
-
- String destinationUri = getExtractDestinationUri(extractDestinationDir);
- JobConfigurationExtract extract = new JobConfigurationExtract()
- .setSourceTable(table)
- .setDestinationFormat("AVRO")
- .setDestinationUris(ImmutableList.of(destinationUri));
-
- LOG.info("Starting BigQuery extract job: {}", jobId);
- jobService.startExtractJob(jobRef, extract);
- Job extractJob =
- jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
- if (parseStatus(extractJob) != Status.SUCCEEDED) {
- throw new IOException(String.format(
- "Extract job %s failed, status: %s.",
- extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus())));
- }
-
- List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
- return ImmutableList.copyOf(tempFiles);
- }
-
- private List<BoundedSource<TableRow>> createSources(
- List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
- final String jsonSchema = JSON_FACTORY.toString(tableSchema);
-
- SerializableFunction<GenericRecord, TableRow> function =
- new SerializableFunction<GenericRecord, TableRow>() {
- @Override
- public TableRow apply(GenericRecord input) {
- return BigQueryAvroUtils.convertGenericRecordToTableRow(
- input, fromJsonString(jsonSchema, TableSchema.class));
- }};
-
- List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
- for (String fileName : files) {
- avroSources.add(new TransformingSource<>(
- AvroSource.from(fileName), function, getDefaultOutputCoder()));
- }
- return ImmutableList.copyOf(avroSources);
- }
-
- protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> {
- private final BigQuerySourceBase source;
- private final BigQueryServices.BigQueryJsonReader reader;
-
- private BigQueryReader(
- BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
- this.source = source;
- this.reader = reader;
- }
-
- @Override
- public BoundedSource<TableRow> getCurrentSource() {
- return source;
- }
-
- @Override
- public boolean start() throws IOException {
- return reader.start();
- }
-
- @Override
- public boolean advance() throws IOException {
- return reader.advance();
- }
-
- @Override
- public TableRow getCurrent() throws NoSuchElementException {
- return reader.getCurrent();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
- }
- }
-
- /**
- * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
- * and transforms elements to type {@code V}.
- */
- @VisibleForTesting
- static class TransformingSource<T, V> extends BoundedSource<V> {
- private final BoundedSource<T> boundedSource;
- private final SerializableFunction<T, V> function;
- private final Coder<V> outputCoder;
-
- TransformingSource(
- BoundedSource<T> boundedSource,
- SerializableFunction<T, V> function,
- Coder<V> outputCoder) {
- this.boundedSource = checkNotNull(boundedSource, "boundedSource");
- this.function = checkNotNull(function, "function");
- this.outputCoder = checkNotNull(outputCoder, "outputCoder");
- }
-
- @Override
- public List<? extends BoundedSource<V>> splitIntoBundles(
- long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
- return Lists.transform(
- boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
- new Function<BoundedSource<T>, BoundedSource<V>>() {
- @Override
- public BoundedSource<V> apply(BoundedSource<T> input) {
- return new TransformingSource<>(input, function, outputCoder);
- }
- });
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return boundedSource.getEstimatedSizeBytes(options);
- }
-
- @Override
- public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
- return new TransformingReader(boundedSource.createReader(options));
- }
-
- @Override
- public void validate() {
- boundedSource.validate();
- }
-
- @Override
- public Coder<V> getDefaultOutputCoder() {
- return outputCoder;
- }
-
- private class TransformingReader extends BoundedReader<V> {
- private final BoundedReader<T> boundedReader;
-
- private TransformingReader(BoundedReader<T> boundedReader) {
- this.boundedReader = checkNotNull(boundedReader, "boundedReader");
- }
-
- @Override
- public synchronized BoundedSource<V> getCurrentSource() {
- return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
- }
-
- @Override
- public boolean start() throws IOException {
- return boundedReader.start();
- }
-
- @Override
- public boolean advance() throws IOException {
- return boundedReader.advance();
- }
-
- @Override
- public V getCurrent() throws NoSuchElementException {
- T current = boundedReader.getCurrent();
- return function.apply(current);
- }
-
- @Override
- public void close() throws IOException {
- boundedReader.close();
- }
-
- @Override
- public synchronized BoundedSource<V> splitAtFraction(double fraction) {
- BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
- return split == null ? null : new TransformingSource<>(split, function, outputCoder);
- }
-
- @Override
- public Double getFractionConsumed() {
- return boundedReader.getFractionConsumed();
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return boundedReader.getCurrentTimestamp();
- }
- }
- }
-
- private static String getExtractJobId(ValueProvider<String> jobIdToken) {
+ static String getExtractJobId(ValueProvider<String> jobIdToken) {
return jobIdToken.get() + "-extract";
}
- private static String getExtractDestinationUri(String extractDestinationDir) {
+ static String getExtractDestinationUri(String extractDestinationDir) {
return String.format("%s/%s", extractDestinationDir, "*.avro");
}
- private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+ static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
throws IOException {
JobStatistics jobStats = extractJob.getStatistics();
List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
@@ -1538,7 +864,8 @@ public class BigQueryIO {
}
/**
- * Writes to the given table, specified in the format described in {@link #parseTableSpec}.
+ * Writes to the given table, specified in the format described in
+ * {@link BigQueryHelpers#parseTableSpec}.
*/
public Write<T> to(String tableSpec) {
return to(StaticValueProvider.of(tableSpec));
@@ -1546,7 +873,7 @@ public class BigQueryIO {
/** Writes to the given table, specified as a {@link TableReference}. */
public Write<T> to(TableReference table) {
- return to(StaticValueProvider.of(toTableSpec(table)));
+ return to(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
}
/** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
@@ -1596,7 +923,7 @@ public class BigQueryIO {
@Override
public TableReference apply(ValueInSingleWindow<T> value) {
- return parseTableSpec(tableSpecFunction.apply(value));
+ return BigQueryHelpers.parseTableSpec(tableSpecFunction.apply(value));
}
}
@@ -1609,7 +936,7 @@ public class BigQueryIO {
*/
public Write<T> withSchema(TableSchema schema) {
return toBuilder()
- .setJsonSchema(StaticValueProvider.of(toJsonString(schema)))
+ .setJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema)))
.build();
}
@@ -1655,7 +982,7 @@ public class BigQueryIO {
checkState(
datasetService.isTableEmpty(tableRef),
"BigQuery table is not empty: %s.",
- BigQueryIO.toTableSpec(tableRef));
+ BigQueryHelpers.toTableSpec(tableRef));
}
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
@@ -1663,7 +990,7 @@ public class BigQueryIO {
}
throw new RuntimeException(
"unable to confirm BigQuery table emptiness for table "
- + BigQueryIO.toTableSpec(tableRef), e);
+ + BigQueryHelpers.toTableSpec(tableRef), e);
}
}
@@ -1758,7 +1085,7 @@ public class BigQueryIO {
ValueProvider<TableReference> table = getTableWithDefaultProject(options);
- final String stepUuid = randomUUIDString();
+ final String stepUuid = BigQueryHelpers.randomUUIDString();
String tempLocation = options.getTempLocation();
String tempFilePrefix;
@@ -1953,7 +1280,7 @@ public class BigQueryIO {
/** Returns the table schema. */
public TableSchema getSchema() {
- return fromJsonString(
+ return BigQueryHelpers.fromJsonString(
getJsonSchema() == null ? null : getJsonSchema().get(), TableSchema.class);
}
@@ -1979,7 +1306,7 @@ public class BigQueryIO {
TableReference tableRef = table.get();
tableRef.setProjectId(bqOptions.getProject());
return NestedValueProvider.of(StaticValueProvider.of(
- toJsonString(tableRef)), new JsonTableRefToTableRef());
+ BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
}
return table;
}
@@ -2128,7 +1455,8 @@ public class BigQueryIO {
List<String> partition = Lists.newArrayList(c.element().getValue()).get(0);
String jobIdPrefix = String.format(
c.sideInput(jobIdToken) + "_%05d", c.element().getKey());
- TableReference ref = fromJsonString(jsonTableRef.get(), TableReference.class);
+ TableReference ref = BigQueryHelpers.fromJsonString(jsonTableRef.get(),
+ TableReference.class);
if (!singlePartition) {
ref.setTableId(jobIdPrefix);
}
@@ -2138,13 +1466,13 @@ public class BigQueryIO {
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
jobIdPrefix,
ref,
- fromJsonString(
+ BigQueryHelpers.fromJsonString(
jsonSchema == null ? null : jsonSchema.get(), TableSchema.class),
partition,
writeDisposition,
createDisposition,
tableDescription);
- c.output(toJsonString(ref));
+ c.output(BigQueryHelpers.toJsonString(ref));
removeTemporaryFiles(c.getPipelineOptions(), tempFilePrefix, partition);
}
@@ -2176,7 +1504,7 @@ public class BigQueryIO {
.setJobId(jobId);
jobService.startLoadJob(jobRef, loadConfig);
Job loadJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
- Status jobStatus = parseStatus(loadJob);
+ Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
switch (jobStatus) {
case SUCCEEDED:
if (tableDescription != null) {
@@ -2185,14 +1513,15 @@ public class BigQueryIO {
return;
case UNKNOWN:
throw new RuntimeException(String.format(
- "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob)));
+ "UNKNOWN status of load job [%s]: %s.", jobId,
+ BigQueryHelpers.jobToPrettyString(loadJob)));
case FAILED:
lastFailedLoadJob = loadJob;
continue;
default:
throw new IllegalStateException(String.format(
"Unexpected status [%s] of load job: %s.",
- jobStatus, jobToPrettyString(loadJob)));
+ jobStatus, BigQueryHelpers.jobToPrettyString(loadJob)));
}
}
throw new RuntimeException(String.format(
@@ -2200,7 +1529,7 @@ public class BigQueryIO {
+ "reached max retries: %d, last failed load job: %s.",
jobIdPrefix,
Write.MAX_RETRY_JOBS,
- jobToPrettyString(lastFailedLoadJob)));
+ BigQueryHelpers.jobToPrettyString(lastFailedLoadJob)));
}
static void removeTemporaryFiles(
@@ -2281,13 +1610,13 @@ public class BigQueryIO {
List<TableReference> tempTables = Lists.newArrayList();
for (String table : tempTablesJson) {
- tempTables.add(fromJsonString(table, TableReference.class));
+ tempTables.add(BigQueryHelpers.fromJsonString(table, TableReference.class));
}
copy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class)),
c.sideInput(jobIdToken),
- fromJsonString(jsonTableRef.get(), TableReference.class),
+ BigQueryHelpers.fromJsonString(jsonTableRef.get(), TableReference.class),
tempTables,
writeDisposition,
createDisposition,
@@ -2322,7 +1651,7 @@ public class BigQueryIO {
.setJobId(jobId);
jobService.startCopyJob(jobRef, copyConfig);
Job copyJob = jobService.pollJob(jobRef, Write.LOAD_JOB_POLL_MAX_RETRIES);
- Status jobStatus = parseStatus(copyJob);
+ Status jobStatus = BigQueryHelpers.parseStatus(copyJob);
switch (jobStatus) {
case SUCCEEDED:
if (tableDescription != null) {
@@ -2331,14 +1660,15 @@ public class BigQueryIO {
return;
case UNKNOWN:
throw new RuntimeException(String.format(
- "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob)));
+ "UNKNOWN status of copy job [%s]: %s.", jobId,
+ BigQueryHelpers.jobToPrettyString(copyJob)));
case FAILED:
lastFailedCopyJob = copyJob;
continue;
default:
throw new IllegalStateException(String.format(
"Unexpected status [%s] of load job: %s.",
- jobStatus, jobToPrettyString(copyJob)));
+ jobStatus, BigQueryHelpers.jobToPrettyString(copyJob)));
}
}
throw new RuntimeException(String.format(
@@ -2346,17 +1676,17 @@ public class BigQueryIO {
+ "reached max retries: %d, last failed copy job: %s.",
jobIdPrefix,
Write.MAX_RETRY_JOBS,
- jobToPrettyString(lastFailedCopyJob)));
+ BigQueryHelpers.jobToPrettyString(lastFailedCopyJob)));
}
static void removeTemporaryTables(DatasetService tableService,
List<TableReference> tempTables) {
for (TableReference tableRef : tempTables) {
try {
- LOG.debug("Deleting table {}", toJsonString(tableRef));
+ LOG.debug("Deleting table {}", BigQueryHelpers.toJsonString(tableRef));
tableService.deleteTable(tableRef);
} catch (Exception e) {
- LOG.warn("Failed to delete the table {}", toJsonString(tableRef), e);
+ LOG.warn("Failed to delete the table {}", BigQueryHelpers.toJsonString(tableRef), e);
}
}
}
@@ -2376,14 +1706,6 @@ public class BigQueryIO {
}
}
- private static String jobToPrettyString(@Nullable Job job) throws IOException {
- return job == null ? "null" : job.toPrettyString();
- }
-
- private static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
- return status == null ? "Unknown status: null." : status.toPrettyString();
- }
-
private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getDataset(table.getProjectId(), table.getDatasetId());
@@ -2391,14 +1713,14 @@ public class BigQueryIO {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
- String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table)),
+ String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryHelpers.toTableSpec(table)),
e);
} else if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(
String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset",
- BigQueryIO.toTableSpec(table)),
+ BigQueryHelpers.toTableSpec(table)),
e);
}
}
@@ -2411,13 +1733,14 @@ public class BigQueryIO {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
- String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table)), e);
+ String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryHelpers.toTableSpec(table)),
+ e);
} else if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(
String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table",
- BigQueryIO.toTableSpec(table)),
+ BigQueryHelpers.toTableSpec(table)),
e);
}
}
@@ -2492,8 +1815,9 @@ public class BigQueryIO {
@ProcessElement
public void processElement(ProcessContext context) {
String tableSpec = context.element().getKey().getKey();
- List<TableRow> rows = getOrCreateMapListValue(tableRows, tableSpec);
- List<String> uniqueIds = getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec);
+ List<TableRow> rows = BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec);
+ List<String> uniqueIds = BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows,
+ tableSpec);
rows.add(context.element().getValue().tableRow);
uniqueIds.add(context.element().getValue().uniqueId);
@@ -2526,7 +1850,7 @@ public class BigQueryIO {
public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec)
throws InterruptedException, IOException {
- TableReference tableReference = parseTableSpec(tableSpec);
+ TableReference tableReference = BigQueryHelpers.parseTableSpec(tableSpec);
if (createDisposition != createDisposition.CREATE_NEVER
&& !createdTables.contains(tableSpec)) {
synchronized (createdTables) {
@@ -2723,7 +2047,7 @@ public class BigQueryIO {
TableReference tableRef = table.get()
.setProjectId(options.as(BigQueryOptions.class).getProject());
table = NestedValueProvider.of(
- StaticValueProvider.of(toJsonString(tableRef)),
+ StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
new JsonTableRefToTableRef());
}
this.tableSpec = NestedValueProvider.of(table, new TableRefToTableSpec());
@@ -2779,7 +2103,7 @@ public class BigQueryIO {
if (table.getProjectId() == null) {
table.setProjectId(options.getProject());
}
- return toTableSpec(table);
+ return BigQueryHelpers.toTableSpec(table);
}
}
}
@@ -2858,68 +2182,9 @@ public class BigQueryIO {
UNKNOWN,
}
- private static Status parseStatus(@Nullable Job job) {
- if (job == null) {
- return Status.UNKNOWN;
- }
- JobStatus status = job.getStatus();
- if (status.getErrorResult() != null) {
- return Status.FAILED;
- } else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
- return Status.FAILED;
- } else {
- return Status.SUCCEEDED;
- }
- }
-
- @VisibleForTesting
- static String toJsonString(Object item) {
- if (item == null) {
- return null;
- }
- try {
- return JSON_FACTORY.toString(item);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()),
- e);
- }
- }
-
- @VisibleForTesting
- static <T> T fromJsonString(String json, Class<T> clazz) {
- if (json == null) {
- return null;
- }
- try {
- return JSON_FACTORY.fromString(json, clazz);
- } catch (IOException e) {
- throw new RuntimeException(
- String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json),
- e);
- }
- }
-
- /**
- * Returns a randomUUID string.
- *
- * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
- */
- private static String randomUUIDString() {
- return UUID.randomUUID().toString().replaceAll("-", "");
- }
-
/////////////////////////////////////////////////////////////////////////////
/** Disallow construction of utility class. */
private BigQueryIO() {}
- private static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
- List<V> value = map.get(key);
- if (value == null) {
- value = new ArrayList<>();
- map.put(key, value);
- }
- return value;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
new file mode 100644
index 0000000..a909957
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -0,0 +1,186 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToProjectId;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+
+/**
+ * A {@link BigQuerySourceBase} for querying BigQuery tables.
+ */
+@VisibleForTesting
+class BigQueryQuerySource extends BigQuerySourceBase {
+
+ static BigQueryQuerySource create(
+ ValueProvider<String> jobIdToken,
+ ValueProvider<String> query,
+ ValueProvider<TableReference> queryTempTableRef,
+ Boolean flattenResults,
+ Boolean useLegacySql,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ return new BigQueryQuerySource(
+ jobIdToken,
+ query,
+ queryTempTableRef,
+ flattenResults,
+ useLegacySql,
+ extractDestinationDir,
+ bqServices);
+ }
+
+ private final ValueProvider<String> query;
+ private final ValueProvider<String> jsonQueryTempTable;
+ private final Boolean flattenResults;
+ private final Boolean useLegacySql;
+ private transient AtomicReference<JobStatistics> dryRunJobStats;
+
+ private BigQueryQuerySource(
+ ValueProvider<String> jobIdToken,
+ ValueProvider<String> query,
+ ValueProvider<TableReference> queryTempTableRef,
+ Boolean flattenResults,
+ Boolean useLegacySql,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ super(jobIdToken, extractDestinationDir, bqServices,
+ NestedValueProvider.of(
+ checkNotNull(queryTempTableRef, "queryTempTableRef"), new TableRefToProjectId()));
+ this.query = checkNotNull(query, "query");
+ this.jsonQueryTempTable = NestedValueProvider.of(
+ queryTempTableRef, new TableRefToJson());
+ this.flattenResults = checkNotNull(flattenResults, "flattenResults");
+ this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
+ this.dryRunJobStats = new AtomicReference<>();
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
+ }
+
+ @Override
+ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ return new BigQueryReader(this, bqServices.getReaderFromQuery(
+ bqOptions, executingProject.get(), createBasicQueryConfig()));
+ }
+
+ @Override
+ protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+ throws IOException, InterruptedException {
+ // 1. Find the location of the query.
+ String location = null;
+ List<TableReference> referencedTables =
+ dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+ DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ if (referencedTables != null && !referencedTables.isEmpty()) {
+ TableReference queryTable = referencedTables.get(0);
+ location = tableService.getTable(queryTable).getLocation();
+ }
+
+ // 2. Create the temporary dataset in the query location.
+ TableReference tableToExtract =
+ BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+ tableService.createDataset(
+ tableToExtract.getProjectId(),
+ tableToExtract.getDatasetId(),
+ location,
+ "Dataset for BigQuery query job temporary table");
+
+ // 3. Execute the query.
+ String queryJobId = jobIdToken.get() + "-query";
+ executeQuery(
+ executingProject.get(),
+ queryJobId,
+ tableToExtract,
+ bqServices.getJobService(bqOptions));
+ return tableToExtract;
+ }
+
+ @Override
+ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ checkState(jsonQueryTempTable.isAccessible());
+ TableReference tableToRemove =
+ BigQueryIO.JSON_FACTORY.fromString(jsonQueryTempTable.get(), TableReference.class);
+
+ DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ tableService.deleteTable(tableToRemove);
+ tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("query", query));
+ }
+
+ private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
+ throws InterruptedException, IOException {
+ if (dryRunJobStats.get() == null) {
+ JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
+ executingProject.get(), createBasicQueryConfig());
+ dryRunJobStats.compareAndSet(null, jobStats);
+ }
+ return dryRunJobStats.get();
+ }
+
+ private void executeQuery(
+ String executingProject,
+ String jobId,
+ TableReference destinationTable,
+ JobService jobService) throws IOException, InterruptedException {
+ JobReference jobRef = new JobReference()
+ .setProjectId(executingProject)
+ .setJobId(jobId);
+
+ JobConfigurationQuery queryConfig = createBasicQueryConfig()
+ .setAllowLargeResults(true)
+ .setCreateDisposition("CREATE_IF_NEEDED")
+ .setDestinationTable(destinationTable)
+ .setPriority("BATCH")
+ .setWriteDisposition("WRITE_EMPTY");
+
+ jobService.startQueryJob(jobRef, queryConfig);
+ Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
+ throw new IOException(String.format(
+ "Query job %s failed, status: %s.", jobId,
+ BigQueryHelpers.statusToPrettyString(job.getStatus())));
+ }
+ }
+
+ private JobConfigurationQuery createBasicQueryConfig() {
+ return new JobConfigurationQuery()
+ .setFlattenResults(flattenResults)
+ .setQuery(query.get())
+ .setUseLegacySql(useLegacySql);
+ }
+
+ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+ in.defaultReadObject();
+ dryRunJobStats = new AtomicReference<>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 15ca262..c8e6ed8 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -444,7 +444,7 @@ class BigQueryServicesImpl implements BigQueryServices {
@Override
public void createTable(Table table) throws InterruptedException, IOException {
LOG.info("Trying to create BigQuery table: {}",
- BigQueryIO.toTableSpec(table.getTableReference()));
+ BigQueryHelpers.toTableSpec(table.getTableReference()));
BackOff backoff =
new ExponentialBackOff.Builder()
.setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS)
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
new file mode 100644
index 0000000..ff50e6d
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -0,0 +1,178 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.TableRowJsonCoder;
+import org.apache.beam.sdk.io.AvroSource;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An abstract {@link BoundedSource} to read a table from BigQuery.
+ *
+ * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
+ * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
+ * and {@link BigQueryQuerySource}, depending on the configuration of the read.
+ * Specifically,
+ * <ul>
+ * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
+ * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
+ * </ul>
+ * ...
+ */
+abstract class BigQuerySourceBase extends BoundedSource<TableRow> {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class);
+
+ // The maximum number of retries to poll a BigQuery job.
+ protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+ protected final ValueProvider<String> jobIdToken;
+ protected final String extractDestinationDir;
+ protected final BigQueryServices bqServices;
+ protected final ValueProvider<String> executingProject;
+
+ BigQuerySourceBase(
+ ValueProvider<String> jobIdToken,
+ String extractDestinationDir,
+ BigQueryServices bqServices,
+ ValueProvider<String> executingProject) {
+ this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+ this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
+ this.bqServices = checkNotNull(bqServices, "bqServices");
+ this.executingProject = checkNotNull(executingProject, "executingProject");
+ }
+
+ @Override
+ public List<BoundedSource<TableRow>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableToExtract = getTableToExtract(bqOptions);
+ JobService jobService = bqServices.getJobService(bqOptions);
+ String extractJobId = BigQueryIO.getExtractJobId(jobIdToken);
+ List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+ TableSchema tableSchema = bqServices.getDatasetService(bqOptions)
+ .getTable(tableToExtract).getSchema();
+
+ cleanupTempResource(bqOptions);
+ return createSources(tempFiles, tableSchema);
+ }
+
+ protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
+
+ protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
+
+ @Override
+ public void validate() {
+ // Do nothing, validation is done in BigQuery.Read.
+ }
+
+ @Override
+ public Coder<TableRow> getDefaultOutputCoder() {
+ return TableRowJsonCoder.of();
+ }
+
+ private List<String> executeExtract(
+ String jobId, TableReference table, JobService jobService)
+ throws InterruptedException, IOException {
+ JobReference jobRef = new JobReference()
+ .setProjectId(executingProject.get())
+ .setJobId(jobId);
+
+ String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);
+ JobConfigurationExtract extract = new JobConfigurationExtract()
+ .setSourceTable(table)
+ .setDestinationFormat("AVRO")
+ .setDestinationUris(ImmutableList.of(destinationUri));
+
+ LOG.info("Starting BigQuery extract job: {}", jobId);
+ jobService.startExtractJob(jobRef, extract);
+ Job extractJob =
+ jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ if (BigQueryHelpers.parseStatus(extractJob) != Status.SUCCEEDED) {
+ throw new IOException(String.format(
+ "Extract job %s failed, status: %s.",
+ extractJob.getJobReference().getJobId(),
+ BigQueryHelpers.statusToPrettyString(extractJob.getStatus())));
+ }
+
+ List<String> tempFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
+ return ImmutableList.copyOf(tempFiles);
+ }
+
+ private List<BoundedSource<TableRow>> createSources(
+ List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+ final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(tableSchema);
+
+ SerializableFunction<GenericRecord, TableRow> function =
+ new SerializableFunction<GenericRecord, TableRow>() {
+ @Override
+ public TableRow apply(GenericRecord input) {
+ return BigQueryAvroUtils.convertGenericRecordToTableRow(
+ input, BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class));
+ }};
+
+ List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
+ for (String fileName : files) {
+ avroSources.add(new TransformingSource<>(
+ AvroSource.from(fileName), function, getDefaultOutputCoder()));
+ }
+ return ImmutableList.copyOf(avroSources);
+ }
+
+ protected static class BigQueryReader extends BoundedReader<TableRow> {
+ private final BigQuerySourceBase source;
+ private final BigQueryServices.BigQueryJsonReader reader;
+
+ BigQueryReader(
+ BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
+ this.source = source;
+ this.reader = reader;
+ }
+
+ @Override
+ public BoundedSource<TableRow> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return reader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return reader.advance();
+ }
+
+ @Override
+ public TableRow getCurrent() throws NoSuchElementException {
+ return reader.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
new file mode 100644
index 0000000..aae0faa
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -0,0 +1,86 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
+import org.apache.beam.sdk.options.BigQueryOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/**
+ * A {@link BigQuerySourceBase} for reading BigQuery tables.
+ */
+@VisibleForTesting
+class BigQueryTableSource extends BigQuerySourceBase {
+
+ static BigQueryTableSource create(
+ ValueProvider<String> jobIdToken,
+ ValueProvider<TableReference> table,
+ String extractDestinationDir,
+ BigQueryServices bqServices,
+ ValueProvider<String> executingProject) {
+ return new BigQueryTableSource(
+ jobIdToken, table, extractDestinationDir, bqServices, executingProject);
+ }
+
+ private final ValueProvider<String> jsonTable;
+ private final AtomicReference<Long> tableSizeBytes;
+
+ private BigQueryTableSource(
+ ValueProvider<String> jobIdToken,
+ ValueProvider<TableReference> table,
+ String extractDestinationDir,
+ BigQueryServices bqServices,
+ ValueProvider<String> executingProject) {
+ super(jobIdToken, extractDestinationDir, bqServices, executingProject);
+ this.jsonTable = NestedValueProvider.of(checkNotNull(table, "table"), new TableRefToJson());
+ this.tableSizeBytes = new AtomicReference<>();
+ }
+
+ @Override
+ protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
+ checkState(jsonTable.isAccessible());
+ return BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
+ }
+
+ @Override
+ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ checkState(jsonTable.isAccessible());
+ TableReference tableRef = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
+ TableReference.class);
+ return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
+ }
+
+ @Override
+ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ if (tableSizeBytes.get() == null) {
+ TableReference table = BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(),
+ TableReference.class);
+
+ Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+ .getTable(table).getNumBytes();
+ tableSizeBytes.compareAndSet(null, numBytes);
+ }
+ return tableSizeBytes.get();
+ }
+
+ @Override
+ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ // Do nothing.
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder.add(DisplayData.item("table", jsonTable));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
new file mode 100644
index 0000000..612afbe
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/PassThroughThenCleanup.java
@@ -0,0 +1,66 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+
+/**
+ * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
+ * has been processed.
+ */
+@VisibleForTesting
+class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private CleanupOperation cleanupOperation;
+
+ PassThroughThenCleanup(CleanupOperation cleanupOperation) {
+ this.cleanupOperation = cleanupOperation;
+ }
+
+ @Override
+ public PCollection<T> expand(PCollection<T> input) {
+ TupleTag<T> mainOutput = new TupleTag<>();
+ TupleTag<Void> cleanupSignal = new TupleTag<>();
+ PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
+ .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
+
+ PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
+ .setCoder(VoidCoder.of())
+ .apply(View.<Void>asSingleton().withDefaultValue(null));
+
+ input.getPipeline()
+ .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
+ .apply("Cleanup", ParDo.of(
+ new DoFn<CleanupOperation, Void>() {
+ @ProcessElement
+ public void processElement(ProcessContext c)
+ throws Exception {
+ c.element().cleanup(c.getPipelineOptions());
+ }
+ }).withSideInputs(cleanupSignalView));
+
+ return outputs.get(mainOutput);
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+
+ abstract static class CleanupOperation implements Serializable {
+ abstract void cleanup(PipelineOptions options) throws Exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/2cc2e81f/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
new file mode 100644
index 0000000..a86adfb
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TransformingSource.java
@@ -0,0 +1,118 @@
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Instant;
+
+/**
+ * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
+ * and transforms elements to type {@code V}.
+*/
+@VisibleForTesting
+class TransformingSource<T, V> extends BoundedSource<V> {
+ private final BoundedSource<T> boundedSource;
+ private final SerializableFunction<T, V> function;
+ private final Coder<V> outputCoder;
+
+ TransformingSource(
+ BoundedSource<T> boundedSource,
+ SerializableFunction<T, V> function,
+ Coder<V> outputCoder) {
+ this.boundedSource = checkNotNull(boundedSource, "boundedSource");
+ this.function = checkNotNull(function, "function");
+ this.outputCoder = checkNotNull(outputCoder, "outputCoder");
+ }
+
+ @Override
+ public List<? extends BoundedSource<V>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return Lists.transform(
+ boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+ new Function<BoundedSource<T>, BoundedSource<V>>() {
+ @Override
+ public BoundedSource<V> apply(BoundedSource<T> input) {
+ return new TransformingSource<>(input, function, outputCoder);
+ }
+ });
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return boundedSource.getEstimatedSizeBytes(options);
+ }
+
+ @Override
+ public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
+ return new TransformingReader(boundedSource.createReader(options));
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public Coder<V> getDefaultOutputCoder() {
+ return outputCoder;
+ }
+
+ private class TransformingReader extends BoundedReader<V> {
+ private final BoundedReader<T> boundedReader;
+
+ private TransformingReader(BoundedReader<T> boundedReader) {
+ this.boundedReader = checkNotNull(boundedReader, "boundedReader");
+ }
+
+ @Override
+ public synchronized BoundedSource<V> getCurrentSource() {
+ return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return boundedReader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return boundedReader.advance();
+ }
+
+ @Override
+ public V getCurrent() throws NoSuchElementException {
+ T current = boundedReader.getCurrent();
+ return function.apply(current);
+ }
+
+ @Override
+ public void close() throws IOException {
+ boundedReader.close();
+ }
+
+ @Override
+ public synchronized BoundedSource<V> splitAtFraction(double fraction) {
+ BoundedSource<T> split = boundedReader.splitAtFraction(fraction);
+ return split == null ? null : new TransformingSource<>(split, function, outputCoder);
+ }
+
+ @Override
+ public Double getFractionConsumed() {
+ return boundedReader.getFractionConsumed();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return boundedReader.getCurrentTimestamp();
+ }
+ }
+}