You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/05/13 17:48:44 UTC
[3/3] incubator-beam git commit: [BEAM-48] Implement BigQueryIO.Read
as Source
[BEAM-48] Implement BigQueryIO.Read as Source
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/987350b7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/987350b7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/987350b7
Branch: refs/heads/master
Commit: 987350b7469f7dd04d9bc4c2145516b579297f61
Parents: 93a5d39
Author: Pei He <pe...@google.com>
Authored: Wed Apr 13 12:03:15 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri May 13 10:48:27 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/BigQueryIO.java | 877 ++++++++++++++++---
.../apache/beam/sdk/util/BigQueryServices.java | 109 ++-
.../beam/sdk/util/BigQueryServicesImpl.java | 347 ++++++--
.../org/apache/beam/sdk/io/BigQueryIOTest.java | 550 ++++++++++--
.../beam/sdk/util/BigQueryServicesImplTest.java | 40 +-
5 files changed, 1657 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 15872e5..a5ef39f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -36,21 +36,26 @@ import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.BigQueryOptions;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.AvroUtils;
import org.apache.beam.sdk.util.BigQueryServices;
+import org.apache.beam.sdk.util.BigQueryServices.DatasetService;
import org.apache.beam.sdk.util.BigQueryServices.JobService;
import org.apache.beam.sdk.util.BigQueryServicesImpl;
import org.apache.beam.sdk.util.BigQueryTableInserter;
import org.apache.beam.sdk.util.BigQueryTableRowIterator;
+import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory;
import org.apache.beam.sdk.util.IOChannelFactory;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.MimeTypes;
@@ -58,18 +63,27 @@ import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.Transport;
-import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.gcsfs.GcsPath;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import com.google.api.client.json.JsonFactory;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
+import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
+import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableReference;
@@ -77,31 +91,42 @@ import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
import com.google.common.base.MoreObjects;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.avro.generic.GenericRecord;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.io.OutputStream;
+import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -362,27 +387,39 @@ public class BigQueryIO {
* {@link PCollection} of {@link TableRow TableRows}.
*/
public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
- TableReference table;
- final String query;
+ @Nullable final String jsonTableRef;
+ @Nullable final String query;
final boolean validate;
- @Nullable
- Boolean flattenResults;
+ @Nullable final Boolean flattenResults;
+ @Nullable final BigQueryServices testBigQueryServices;
private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";
+ // The maximum number of retries to poll a BigQuery job in the cleanup phase.
+ // We expect the jobs have already DONE, and don't need a high max retires.
+ private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10;
+
private Bound() {
- this(null, null, null, true, null);
+ this(
+ null /* name */,
+ null /* query */,
+ null /* jsonTableRef */,
+ true /* validate */,
+ null /* flattenResults */,
+ null /* testBigQueryServices */);
}
- private Bound(String name, String query, TableReference reference, boolean validate,
- Boolean flattenResults) {
+ private Bound(
+ String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate,
+ @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) {
super(name);
- this.table = reference;
+ this.jsonTableRef = jsonTableRef;
this.query = query;
this.validate = validate;
this.flattenResults = flattenResults;
+ this.testBigQueryServices = testBigQueryServices;
}
/**
@@ -391,7 +428,7 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound named(String name) {
- return new Bound(name, query, table, validate, flattenResults);
+ return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices);
}
/**
@@ -410,7 +447,8 @@ public class BigQueryIO {
* <p>Does not modify this object.
*/
public Bound from(TableReference table) {
- return new Bound(name, query, table, validate, flattenResults);
+ return new Bound(
+ name, query, toJsonString(table), validate, flattenResults, testBigQueryServices);
}
/**
@@ -424,15 +462,15 @@ public class BigQueryIO {
* {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
*/
public Bound fromQuery(String query) {
- return new Bound(name, query, table, validate,
- MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
+ return new Bound(name, query, jsonTableRef, validate,
+ MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices);
}
/**
* Disable table validation.
*/
public Bound withoutValidation() {
- return new Bound(name, query, table, false, flattenResults);
+ return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices);
}
/**
@@ -443,35 +481,34 @@ public class BigQueryIO {
* from a table will cause an error during validation.
*/
public Bound withoutResultFlattening() {
- return new Bound(name, query, table, validate, false);
+ return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices);
+ }
+
+ @VisibleForTesting
+ Bound withTestServices(BigQueryServices testServices) {
+ return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices);
}
@Override
public void validate(PInput input) {
- if (table == null && query == null) {
- throw new IllegalStateException(
- "Invalid BigQuery read operation, either table reference or query has to be set");
- } else if (table != null && query != null) {
- throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
- + " query and a table, only one of these should be provided");
- } else if (table != null && flattenResults != null) {
- throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
- + " table with a result flattening preference, which is not configurable");
- } else if (query != null && flattenResults == null) {
- throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
- + " query without a result flattening preference");
- }
-
- BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
- if (table != null && table.getProjectId() == null) {
- // If user does not specify a project we assume the table to be located in the project
- // that owns the Dataflow job.
- LOG.warn(String.format(SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(),
- table.getTableId(), bqOptions.getProject()));
- table.setProjectId(bqOptions.getProject());
- }
-
if (validate) {
+ BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ TableReference table = getTableWithDefaultProject(bqOptions);
+ if (table == null && query == null) {
+ throw new IllegalStateException(
+ "Invalid BigQuery read operation, either table reference or query has to be set");
+ } else if (table != null && query != null) {
+ throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
+ + " query and a table, only one of these should be provided");
+ } else if (table != null && flattenResults != null) {
+ throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+ + " table with a result flattening preference, which is not configurable");
+ } else if (query != null && flattenResults == null) {
+ throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+ + " query without a result flattening preference");
+ }
+
// Check for source table/query presence for early failure notification.
// Note that a presence check can fail if the table or dataset are created by earlier
// stages of the pipeline or if a query depends on earlier stages of a pipeline. For these
@@ -504,14 +541,83 @@ public class BigQueryIO {
@Override
public PCollection<TableRow> apply(PInput input) {
- return PCollection.<TableRow>createPrimitiveOutputInternal(
- input.getPipeline(),
- WindowingStrategy.globalDefault(),
- IsBounded.BOUNDED)
- // Force the output's Coder to be what the read is using, and
- // unchangeable later, to ensure that we read the input in the
- // format specified by the Read transform.
- .setCoder(TableRowJsonCoder.of());
+ String uuid = randomUUIDString();
+ final String jobIdToken = "beam_job_" + uuid;
+
+ BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+ BoundedSource<TableRow> source;
+ final BigQueryServices bqServices = getBigQueryServices();
+
+ final String extractDestinationDir;
+ String tempLocation = bqOptions.getTempLocation();
+ try {
+ IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+ extractDestinationDir = factory.resolve(tempLocation, uuid);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format("Failed to resolve extract destination directory in %s", tempLocation));
+ }
+
+ if (!Strings.isNullOrEmpty(query)) {
+ String projectId = bqOptions.getProject();
+ String queryTempDatasetId = "temp_dataset_" + uuid;
+ String queryTempTableId = "temp_table_" + uuid;
+
+ TableReference queryTempTableRef = new TableReference()
+ .setProjectId(projectId)
+ .setDatasetId(queryTempDatasetId)
+ .setTableId(queryTempTableId);
+
+ String jsonQueryTempTable;
+ try {
+ jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+ }
+ source = BigQueryQuerySource.create(
+ jobIdToken, query, jsonQueryTempTable, flattenResults,
+ extractDestinationDir, bqServices);
+ } else {
+ String jsonTable;
+ try {
+ jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions));
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot initialize table to JSON strings.", e);
+ }
+ source = BigQueryTableSource.create(
+ jobIdToken, jsonTable, extractDestinationDir, bqServices);
+ }
+ PassThroughThenCleanup.CleanupOperation cleanupOperation =
+ new PassThroughThenCleanup.CleanupOperation() {
+ @Override
+ void cleanup(PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+
+ JobReference jobRef = new JobReference()
+ .setProjectId(bqOptions.getProject())
+ .setJobId(getExtractJobId(jobIdToken));
+ Job extractJob = bqServices.getJobService(bqOptions).pollJob(
+ jobRef, CLEANUP_JOB_POLL_MAX_RETRIES);
+
+ Collection<String> extractFiles = null;
+ if (extractJob != null) {
+ extractFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+ } else {
+ IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+ Collection<String> dirMatch = factory.match(extractDestinationDir);
+ if (!dirMatch.isEmpty()) {
+ extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
+ }
+ }
+ if (extractFiles != null && !extractFiles.isEmpty()) {
+ new GcsUtilFactory().create(options).remove(extractFiles);
+ }
+ }};
+ return input.getPipeline()
+ .apply(org.apache.beam.sdk.io.Read.from(source))
+ .setCoder(getDefaultOutputCoder())
+ .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
}
@Override
@@ -522,6 +628,7 @@ public class BigQueryIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
+ TableReference table = getTable();
if (table != null) {
builder.add(DisplayData.item("table", toTableSpec(table)));
@@ -533,22 +640,26 @@ public class BigQueryIO {
.addIfNotDefault(DisplayData.item("validation", validate), true);
}
- static {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
- @Override
- public void evaluate(
- Bound transform, DirectPipelineRunner.EvaluationContext context) {
- evaluateReadHelper(transform, context);
- }
- });
+ /**
+ * Returns the table to write, or {@code null} if reading from a query instead.
+ *
+ * <p>If the table's project is not specified, use the default one.
+ */
+ @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
+ TableReference table = getTable();
+ if (table != null && table.getProjectId() == null) {
+ // If user does not specify a project we assume the table to be located in
+ // the default project.
+ table.setProjectId(bqOptions.getProject());
+ }
+ return table;
}
/**
* Returns the table to read, or {@code null} if reading from a query instead.
*/
public TableReference getTable() {
- return table;
+ return fromJsonString(jsonTableRef, TableReference.class);
}
/**
@@ -571,12 +682,567 @@ public class BigQueryIO {
public Boolean getFlattenResults() {
return flattenResults;
}
+
+ private BigQueryServices getBigQueryServices() {
+ if (testBigQueryServices != null) {
+ return testBigQueryServices;
+ } else {
+ return new BigQueryServicesImpl();
+ }
+ }
}
/** Disallow construction of utility class. */
private Read() {}
}
+ /**
+ * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection}
+ * has been processed.
+ */
+ @VisibleForTesting
+ static class PassThroughThenCleanup<T> extends PTransform<PCollection<T>, PCollection<T>> {
+
+ private CleanupOperation cleanupOperation;
+
+ PassThroughThenCleanup(CleanupOperation cleanupOperation) {
+ this.cleanupOperation = cleanupOperation;
+ }
+
+ @Override
+ public PCollection<T> apply(PCollection<T> input) {
+ TupleTag<T> mainOutput = new TupleTag<>();
+ TupleTag<Void> cleanupSignal = new TupleTag<>();
+ PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn<T>())
+ .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
+
+ PCollectionView<Void> cleanupSignalView = outputs.get(cleanupSignal)
+ .setCoder(VoidCoder.of())
+ .apply(View.<Void>asSingleton().withDefaultValue(null));
+
+ input.getPipeline()
+ .apply("Create(CleanupOperation)", Create.of(cleanupOperation))
+ .apply("Cleanup", ParDo.of(
+ new DoFn<CleanupOperation, Void>() {
+ @Override
+ public void processElement(ProcessContext c)
+ throws Exception {
+ c.element().cleanup(c.getPipelineOptions());
+ }
+ }).withSideInputs(cleanupSignalView));
+
+ return outputs.get(mainOutput);
+ }
+
+ private static class IdentityFn<T> extends DoFn<T, T> {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element());
+ }
+ }
+
+ abstract static class CleanupOperation implements Serializable {
+ abstract void cleanup(PipelineOptions options) throws Exception;
+ }
+ }
+
+ /**
+ * A {@link BigQuerySourceBase} for reading BigQuery tables.
+ */
+ @VisibleForTesting
+ static class BigQueryTableSource extends BigQuerySourceBase {
+
+ static BigQueryTableSource create(
+ String jobIdToken,
+ String jsonTable,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices);
+ }
+
+ private final String jsonTable;
+ private final AtomicReference<Long> tableSizeBytes;
+
+ private BigQueryTableSource(
+ String jobIdToken,
+ String jsonTable,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ super(jobIdToken, extractDestinationDir, bqServices);
+ this.jsonTable = checkNotNull(jsonTable, "jsonTable");
+ this.tableSizeBytes = new AtomicReference<>();
+ }
+
+ @Override
+ protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
+ return JSON_FACTORY.fromString(jsonTable, TableReference.class);
+ }
+
+ @Override
+ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableRef = JSON_FACTORY.fromString(jsonTable, TableReference.class);
+ return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef));
+ }
+
+ @Override
+ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ if (tableSizeBytes.get() == null) {
+ TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class);
+
+ Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class))
+ .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId())
+ .getNumBytes();
+ tableSizeBytes.compareAndSet(null, numBytes);
+ }
+ return tableSizeBytes.get();
+ }
+
+ @Override
+ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ // Do nothing.
+ }
+ }
+
+ /**
+ * A {@link BigQuerySourceBase} for querying BigQuery tables.
+ */
+ @VisibleForTesting
+ static class BigQueryQuerySource extends BigQuerySourceBase {
+
+ static BigQueryQuerySource create(
+ String jobIdToken,
+ String query,
+ String jsonQueryTempTable,
+ Boolean flattenResults,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ return new BigQueryQuerySource(
+ jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices);
+ }
+
+ private final String query;
+ private final String jsonQueryTempTable;
+ private final Boolean flattenResults;
+ private transient AtomicReference<JobStatistics> dryRunJobStats;
+
+ private BigQueryQuerySource(
+ String jobIdToken,
+ String query,
+ String jsonQueryTempTable,
+ Boolean flattenResults,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ super(jobIdToken, extractDestinationDir, bqServices);
+ this.query = checkNotNull(query, "query");
+ this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable");
+ this.flattenResults = checkNotNull(flattenResults, "flattenResults");
+ this.dryRunJobStats = new AtomicReference<>();
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
+ }
+
+ @Override
+ public BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ return new BigQueryReader(this, bqServices.getReaderFromQuery(
+ bqOptions, query, bqOptions.getProject(), flattenResults));
+ }
+
+ @Override
+ protected TableReference getTableToExtract(BigQueryOptions bqOptions)
+ throws IOException, InterruptedException {
+ // 1. Find the location of the query.
+ TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions)
+ .getQuery()
+ .getReferencedTables()
+ .get(0);
+ DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ String location = tableService.getTable(
+ dryRunTempTable.getProjectId(),
+ dryRunTempTable.getDatasetId(),
+ dryRunTempTable.getTableId()).getLocation();
+
+ // 2. Create the temporary dataset in the query location.
+ TableReference tableToExtract =
+ JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
+ tableService.createDataset(
+ tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
+
+ // 3. Execute the query.
+ String queryJobId = jobIdToken + "-query";
+ executeQuery(
+ queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions));
+ return tableToExtract;
+ }
+
+ @Override
+ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
+ TableReference tableToRemove =
+ JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class);
+
+ DatasetService tableService = bqServices.getDatasetService(bqOptions);
+ tableService.deleteTable(
+ tableToRemove.getProjectId(),
+ tableToRemove.getDatasetId(),
+ tableToRemove.getTableId());
+ tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
+ }
+
+ private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
+ throws InterruptedException, IOException {
+ if (dryRunJobStats.get() == null) {
+ String projectId = bqOptions.getProject();
+ JobStatistics jobStats =
+ bqServices.getJobService(bqOptions).dryRunQuery(projectId, query);
+ dryRunJobStats.compareAndSet(null, jobStats);
+ }
+ return dryRunJobStats.get();
+ }
+
+ private static void executeQuery(
+ String jobId,
+ String query,
+ TableReference destinationTable,
+ boolean flattenResults,
+ JobService jobService) throws IOException, InterruptedException {
+ JobReference jobRef = new JobReference()
+ .setProjectId(destinationTable.getProjectId())
+ .setJobId(jobId);
+ JobConfigurationQuery queryConfig = new JobConfigurationQuery();
+ queryConfig
+ .setQuery(query)
+ .setAllowLargeResults(true)
+ .setCreateDisposition("CREATE_IF_NEEDED")
+ .setDestinationTable(destinationTable)
+ .setFlattenResults(flattenResults)
+ .setPriority("BATCH")
+ .setWriteDisposition("WRITE_EMPTY");
+ jobService.startQueryJob(jobRef, queryConfig);
+ Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ if (parseStatus(job) != Status.SUCCEEDED) {
+ throw new IOException("Query job failed: " + jobId);
+ }
+ return;
+ }
+
+ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
+ in.defaultReadObject();
+ dryRunJobStats = new AtomicReference<>();
+ }
+ }
+
+ /**
+ * An abstract {@link BoundedSource} to read a table from BigQuery.
+ *
+ * <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then
+ * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource},
+ * and {@link BigQueryQuerySource}, depending on the configuration of the read.
+ * Specifically,
+ * <ul>
+ * <li>{@link BigQueryTableSource} is for reading BigQuery tables</li>
+ * <li>{@link BigQueryQuerySource} is for querying BigQuery tables</li>
+ * </ul>
+ * ...
+ */
+ private abstract static class BigQuerySourceBase extends BoundedSource<TableRow> {
+ // The maximum number of attempts to verify temp files.
+ private static final int MAX_FILES_VERIFY_ATTEMPTS = 10;
+
+ // The maximum number of retries to poll a BigQuery job.
+ protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
+
+ // The initial backoff for verifying temp files.
+ private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
+
+ protected final String jobIdToken;
+ protected final String extractDestinationDir;
+ protected final BigQueryServices bqServices;
+
+ private BigQuerySourceBase(
+ String jobIdToken,
+ String extractDestinationDir,
+ BigQueryServices bqServices) {
+ this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken");
+ this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir");
+ this.bqServices = checkNotNull(bqServices, "bqServices");
+ }
+
+ @Override
+ public List<BoundedSource<TableRow>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+ TableReference tableToExtract = getTableToExtract(bqOptions);
+ JobService jobService = bqServices.getJobService(bqOptions);
+ String extractJobId = getExtractJobId(jobIdToken);
+ List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);
+
+ TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
+ tableToExtract.getProjectId(),
+ tableToExtract.getDatasetId(),
+ tableToExtract.getTableId()).getSchema();
+
+ cleanupTempResource(bqOptions);
+ return createSources(tempFiles, tableSchema);
+ }
+
+ protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
+
+ protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return false;
+ }
+
+ @Override
+ public void validate() {
+ // Do nothing, validation is done in BigQuery.Read.
+ }
+
+ @Override
+ public Coder<TableRow> getDefaultOutputCoder() {
+ return TableRowJsonCoder.of();
+ }
+
+ private List<String> executeExtract(
+ String jobId, TableReference table, JobService jobService)
+ throws InterruptedException, IOException {
+ JobReference jobRef = new JobReference()
+ .setProjectId(table.getProjectId())
+ .setJobId(jobId);
+
+ String destinationUri = getExtractDestinationUri(extractDestinationDir);
+ JobConfigurationExtract extract = new JobConfigurationExtract()
+ .setSourceTable(table)
+ .setDestinationFormat("AVRO")
+ .setDestinationUris(ImmutableList.of(destinationUri));
+
+ LOG.info("Starting BigQuery extract job: {}", jobId);
+ jobService.startExtractJob(jobRef, extract);
+ Job extractJob =
+ jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
+ if (parseStatus(extractJob) != Status.SUCCEEDED) {
+ throw new IOException(String.format(
+ "Extract job %s failed, status: %s",
+ extractJob.getJobReference().getJobId(), extractJob.getStatus()));
+ }
+
+ List<String> tempFiles = getExtractFilePaths(extractDestinationDir, extractJob);
+ return ImmutableList.copyOf(tempFiles);
+ }
+
+ private List<BoundedSource<TableRow>> createSources(
+ List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
+ final String jsonSchema = JSON_FACTORY.toString(tableSchema);
+
+ SerializableFunction<GenericRecord, TableRow> function =
+ new SerializableFunction<GenericRecord, TableRow>() {
+ @Override
+ public TableRow apply(GenericRecord input) {
+ try {
+ return AvroUtils.convertGenericRecordToTableRow(
+ input, JSON_FACTORY.fromString(jsonSchema, TableSchema.class));
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to convert GenericRecord to TableRow", e);
+ }
+ }};
+
+ List<BoundedSource<TableRow>> avroSources = Lists.newArrayList();
+ BackOff backoff = new AttemptBoundedExponentialBackOff(
+ MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS);
+ for (String fileName : files) {
+ while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) {
+ if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) {
+ break;
+ }
+ }
+ avroSources.add(new TransformingSource<>(
+ AvroSource.from(fileName), function, getDefaultOutputCoder()));
+ }
+ return ImmutableList.copyOf(avroSources);
+ }
+
+ protected static class BigQueryReader extends BoundedSource.BoundedReader<TableRow> {
+ private final BigQuerySourceBase source;
+ private final BigQueryServices.BigQueryJsonReader reader;
+
+ private BigQueryReader(
+ BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
+ this.source = source;
+ this.reader = reader;
+ }
+
+ @Override
+ public BoundedSource<TableRow> getCurrentSource() {
+ return source;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return reader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return reader.advance();
+ }
+
+ @Override
+ public TableRow getCurrent() throws NoSuchElementException {
+ return reader.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * A {@link BoundedSource} that reads from {@code BoundedSource<T>}
+ * and transforms elements to type {@code V}.
+ */
+ @VisibleForTesting
+ static class TransformingSource<T, V> extends BoundedSource<V> {
+ private final BoundedSource<T> boundedSource;
+ private final SerializableFunction<T, V> function;
+ private final Coder<V> outputCoder;
+
+ TransformingSource(
+ BoundedSource<T> boundedSource,
+ SerializableFunction<T, V> function,
+ Coder<V> outputCoder) {
+ this.boundedSource = boundedSource;
+ this.function = function;
+ this.outputCoder = outputCoder;
+ }
+
+ @Override
+ public List<? extends BoundedSource<V>> splitIntoBundles(
+ long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+ return Lists.transform(
+ boundedSource.splitIntoBundles(desiredBundleSizeBytes, options),
+ new Function<BoundedSource<T>, BoundedSource<V>>() {
+ @Override
+ public BoundedSource<V> apply(BoundedSource<T> input) {
+ return new TransformingSource<>(input, function, outputCoder);
+ }
+ });
+ }
+
+ @Override
+ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+ return boundedSource.getEstimatedSizeBytes(options);
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+ return boundedSource.producesSortedKeys(options);
+ }
+
+ @Override
+ public BoundedReader<V> createReader(PipelineOptions options) throws IOException {
+ return new TransformingReader(boundedSource.createReader(options));
+ }
+
+ @Override
+ public void validate() {
+ boundedSource.validate();
+ }
+
+ @Override
+ public Coder<V> getDefaultOutputCoder() {
+ return outputCoder;
+ }
+
+ private class TransformingReader extends BoundedReader<V> {
+ private final BoundedReader<T> boundedReader;
+
+ private TransformingReader(BoundedReader<T> boundedReader) {
+ this.boundedReader = boundedReader;
+ }
+
+ @Override
+ public synchronized BoundedSource<V> getCurrentSource() {
+ return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder);
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return boundedReader.start();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ return boundedReader.advance();
+ }
+
+ @Override
+ public V getCurrent() throws NoSuchElementException {
+ T current = boundedReader.getCurrent();
+ return function.apply(current);
+ }
+
+ @Override
+ public void close() throws IOException {
+ boundedReader.close();
+ }
+
+ @Override
+ public synchronized BoundedSource<V> splitAtFraction(double fraction) {
+ return new TransformingSource<>(
+ boundedReader.splitAtFraction(fraction), function, outputCoder);
+ }
+
+ @Override
+ public Double getFractionConsumed() {
+ return boundedReader.getFractionConsumed();
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return boundedReader.getCurrentTimestamp();
+ }
+ }
+ }
+
+ private static String getExtractJobId(String jobIdToken) {
+ return jobIdToken + "-extract";
+ }
+
+ private static String getExtractDestinationUri(String extractDestinationDir) {
+ return String.format("%s/%s", extractDestinationDir, "*.avro");
+ }
+
+ private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob)
+ throws IOException {
+ JobStatistics jobStats = extractJob.getStatistics();
+ List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
+ if (counts.size() != 1) {
+ String errorMessage = (counts.size() == 0 ?
+ "No destination uri file count received." :
+ String.format("More than one destination uri file count received. First two are %s, %s",
+ counts.get(0), counts.get(1)));
+ throw new RuntimeException(errorMessage);
+ }
+ long filesCount = counts.get(0);
+
+ ImmutableList.Builder<String> paths = ImmutableList.builder();
+ IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
+ for (long i = 0; i < filesCount; ++i) {
+ String filePath =
+ factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
+ paths.add(filePath);
+ }
+ return paths.build();
+ }
+
/////////////////////////////////////////////////////////////////////////////
/**
@@ -799,8 +1465,15 @@ public class BigQueryIO {
*/
@Deprecated
public Bound() {
- this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED,
- WriteDisposition.WRITE_EMPTY, true, null);
+ this(
+ null /* name */,
+ null /* jsonTableRef */,
+ null /* tableRefFunction */,
+ null /* jsonSchema */,
+ CreateDisposition.CREATE_IF_NEEDED,
+ WriteDisposition.WRITE_EMPTY,
+ true /* validate */,
+ null /* testBigQueryServices */);
}
private Bound(String name, @Nullable String jsonTableRef,
@@ -1032,7 +1705,7 @@ public class BigQueryIO {
if (Strings.isNullOrEmpty(table.getProjectId())) {
table.setProjectId(options.getProject());
}
- String jobIdToken = UUID.randomUUID().toString();
+ String jobIdToken = randomUUIDString();
String tempLocation = options.getTempLocation();
String tempFilePrefix;
try {
@@ -1181,7 +1854,7 @@ public class BigQueryIO {
// The maximum number of retries to poll the status of a load job.
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
- private static final int MAX_JOB_STATUS_POLL_RETRIES = Integer.MAX_VALUE;
+ private static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
private final BigQuerySink bigQuerySink;
@@ -1237,13 +1910,13 @@ public class BigQueryIO {
@Nullable TableSchema schema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) throws InterruptedException, IOException {
- JobConfigurationLoad loadConfig = new JobConfigurationLoad();
- loadConfig.setSourceUris(gcsUris);
- loadConfig.setDestinationTable(ref);
- loadConfig.setSchema(schema);
- loadConfig.setWriteDisposition(writeDisposition.name());
- loadConfig.setCreateDisposition(createDisposition.name());
- loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON");
+ JobConfigurationLoad loadConfig = new JobConfigurationLoad()
+ .setSourceUris(gcsUris)
+ .setDestinationTable(ref)
+ .setSchema(schema)
+ .setWriteDisposition(writeDisposition.name())
+ .setCreateDisposition(createDisposition.name())
+ .setSourceFormat("NEWLINE_DELIMITED_JSON");
boolean retrying = false;
String projectId = ref.getProjectId();
@@ -1253,9 +1926,12 @@ public class BigQueryIO {
LOG.info("Previous load jobs failed, retrying.");
}
LOG.info("Starting BigQuery load job: {}", jobId);
- jobService.startLoadJob(jobId, loadConfig);
- Status jobStatus = parseStatus(
- jobService.pollJob(projectId, jobId, MAX_JOB_STATUS_POLL_RETRIES));
+ JobReference jobRef = new JobReference()
+ .setProjectId(projectId)
+ .setJobId(jobId);
+ jobService.startLoadJob(jobRef, loadConfig);
+ Status jobStatus =
+ parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES));
switch (jobStatus) {
case SUCCEEDED:
return;
@@ -1711,7 +2387,7 @@ public class BigQueryIO {
UNKNOWN,
}
- private static Status parseStatus(@Nullable Job job) {
+ private static Status parseStatus(Job job) {
if (job == null) {
return Status.UNKNOWN;
}
@@ -1725,7 +2401,8 @@ public class BigQueryIO {
}
}
- private static String toJsonString(Object item) {
+ @VisibleForTesting
+ static String toJsonString(Object item) {
if (item == null) {
return null;
}
@@ -1738,7 +2415,8 @@ public class BigQueryIO {
}
}
- private static <T> T fromJsonString(String json, Class<T> clazz) {
+ @VisibleForTesting
+ static <T> T fromJsonString(String json, Class<T> clazz) {
if (json == null) {
return null;
}
@@ -1751,51 +2429,20 @@ public class BigQueryIO {
}
}
- /////////////////////////////////////////////////////////////////////////////
-
- /** Disallow construction of utility class. */
- private BigQueryIO() {}
-
/**
- * Direct mode read evaluator.
+ * Returns a randomUUID string.
*
- * <p>This loads the entire table into an in-memory PCollection.
+ * <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
*/
- private static void evaluateReadHelper(
- Read.Bound transform, DirectPipelineRunner.EvaluationContext context) {
- BigQueryOptions options = context.getPipelineOptions();
- Bigquery client = Transport.newBigQueryClient(options).build();
- if (transform.table != null && transform.table.getProjectId() == null) {
- transform.table.setProjectId(options.getProject());
- }
-
- BigQueryTableRowIterator iterator;
- if (transform.query != null) {
- LOG.info("Reading from BigQuery query {}", transform.query);
- iterator =
- BigQueryTableRowIterator.fromQuery(
- transform.query, options.getProject(), client, transform.getFlattenResults());
- } else {
- LOG.info("Reading from BigQuery table {}", toTableSpec(transform.table));
- iterator = BigQueryTableRowIterator.fromTable(transform.table, client);
- }
-
- try (BigQueryTableRowIterator ignored = iterator) {
- List<TableRow> elems = new ArrayList<>();
- iterator.open();
- while (iterator.advance()) {
- elems.add(iterator.getCurrent());
- }
- LOG.info("Number of records read from BigQuery: {}", elems.size());
- context.setPCollection(context.getOutput(transform), elems);
- } catch (IOException | InterruptedException e) {
- if (e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
- throw new RuntimeException(e);
- }
+ private static String randomUUIDString() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
}
+ /////////////////////////////////////////////////////////////////////////////
+
+ /** Disallow construction of utility class. */
+ private BigQueryIO() {}
+
private static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
List<V> value = map.get(key);
if (value == null) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
index b12e049..f82edf4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java
@@ -19,13 +19,22 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.BigQueryOptions;
+import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
+import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
import java.io.IOException;
import java.io.Serializable;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
/**
* An interface for real, mock, or fake implementations of Cloud BigQuery services.
@@ -38,25 +47,40 @@ public interface BigQueryServices extends Serializable {
public JobService getJobService(BigQueryOptions bqOptions);
/**
+ * Returns a real, mock, or fake {@link DatasetService}.
+ */
+ public DatasetService getDatasetService(BigQueryOptions bqOptions);
+
+ /**
+ * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables.
+ */
+ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef);
+
+ /**
+ * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables.
+ */
+ public BigQueryJsonReader getReaderFromQuery(
+ BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten);
+
+ /**
* An interface for the Cloud BigQuery load service.
*/
public interface JobService {
/**
- * Starts a BigQuery load job.
+ * Start a BigQuery load job.
*/
- void startLoadJob(String jobId, JobConfigurationLoad loadConfig)
+ void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException;
-
/**
* Start a BigQuery extract job.
*/
- void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+ void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException;
/**
- * Start a BigQuery extract job.
+ * Start a BigQuery query job.
*/
- void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun)
+ void startQueryJob(JobReference jobRef, JobConfigurationQuery query)
throws IOException, InterruptedException;
/**
@@ -64,7 +88,78 @@ public interface BigQueryServices extends Serializable {
*
* <p>Returns null if the {@code maxAttempts} retries reached.
*/
- Job pollJob(String projectId, String jobId, int maxAttempts)
+ Job pollJob(JobReference jobRef, int maxAttempts)
throws InterruptedException, IOException;
+
+ /**
+ * Dry runs the query in the given project.
+ */
+ JobStatistics dryRunQuery(String projectId, String query)
+ throws InterruptedException, IOException;
+ }
+
+ /**
+ * An interface to get, create and delete Cloud BigQuery datasets and tables.
+ */
+ public interface DatasetService {
+ /**
+ * Gets the specified {@link Table} resource by table ID.
+ */
+ Table getTable(String projectId, String datasetId, String tableId)
+ throws InterruptedException, IOException;
+
+ /**
+ * Deletes the table specified by tableId from the dataset.
+ * If the table contains data, all the data will be deleted.
+ */
+ void deleteTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException;
+
+ /**
+ * Create a {@link Dataset} with the given {@code location} and {@code description}.
+ */
+ void createDataset(String projectId, String datasetId, String location, String description)
+ throws IOException, InterruptedException;
+
+ /**
+ * Deletes the dataset specified by the datasetId value.
+ *
+ * <p>Before you can delete a dataset, you must delete all its tables.
+ */
+ void deleteDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException;
+ }
+
+ /**
+ * An interface to read the Cloud BigQuery directly.
+ */
+ public interface BigQueryJsonReader {
+ /**
+ * Initializes the reader and advances the reader to the first record.
+ */
+ boolean start() throws IOException;
+
+ /**
+ * Advances the reader to the next valid record.
+ */
+ boolean advance() throws IOException;
+
+ /**
+ * Returns the value of the data item that was read by the last {@link #start} or
+ * {@link #advance} call. The returned value must be effectively immutable and remain valid
+ * indefinitely.
+ *
+ * <p>Multiple calls to this method without an intervening call to {@link #advance} should
+ * return the same result.
+ *
+ * @throws java.util.NoSuchElementException if {@link #start} was never called, or if
+ * the last {@link #start} or {@link #advance} returned {@code false}.
+ */
+ TableRow getCurrent() throws NoSuchElementException;
+
+ /**
+ * Closes the reader. The reader cannot be used after this method is called.
+ */
+ void close() throws IOException;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/987350b7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
index 2bfe84f..01ea45f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java
@@ -20,17 +20,24 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.BigQueryOptions;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.client.googleapis.services.AbstractGoogleClientRequest;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.bigquery.model.Dataset;
+import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
+import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.annotations.VisibleForTesting;
@@ -38,14 +45,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+
/**
* An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery
* service.
*/
public class BigQueryServicesImpl implements BigQueryServices {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class);
+
// The maximum number of attempts to execute a BigQuery RPC.
private static final int MAX_RPC_ATTEMPTS = 10;
@@ -53,17 +65,31 @@ public class BigQueryServicesImpl implements BigQueryServices {
private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
// The initial backoff for polling the status of a BigQuery job.
- private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60);
+ private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1);
@Override
public JobService getJobService(BigQueryOptions options) {
return new JobServiceImpl(options);
}
+ @Override
+ public DatasetService getDatasetService(BigQueryOptions options) {
+ return new DatasetServiceImpl(options);
+ }
+
+ @Override
+ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) {
+ return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef);
+ }
+
+ @Override
+ public BigQueryJsonReader getReaderFromQuery(
+ BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) {
+ return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten);
+ }
+
@VisibleForTesting
static class JobServiceImpl implements BigQueryServices.JobService {
- private static final Logger LOG = LoggerFactory.getLogger(JobServiceImpl.class);
-
private final ApiErrorExtractor errorExtractor;
private final Bigquery client;
@@ -81,22 +107,17 @@ public class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC retries.
+ * @throws IOException if it exceeds max RPC .
*/
@Override
public void startLoadJob(
- String jobId,
+ JobReference jobRef,
JobConfigurationLoad loadConfig) throws InterruptedException, IOException {
- Job job = new Job();
- JobReference jobRef = new JobReference();
- jobRef.setProjectId(loadConfig.getDestinationTable().getProjectId());
- jobRef.setJobId(jobId);
- job.setJobReference(jobRef);
- JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setLoad(loadConfig);
- job.setConfiguration(jobConfig);
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(new JobConfiguration().setLoad(loadConfig));
startJob(job, errorExtractor, client);
}
@@ -104,21 +125,17 @@ public class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC retries.
+ * @throws IOException if it exceeds max RPC .
*/
@Override
- public void startExtractJob(String jobId, JobConfigurationExtract extractConfig)
+ public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig)
throws InterruptedException, IOException {
- Job job = new Job();
- JobReference jobRef = new JobReference();
- jobRef.setProjectId(extractConfig.getSourceTable().getProjectId());
- jobRef.setJobId(jobId);
- job.setJobReference(jobRef);
- JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setExtract(extractConfig);
- job.setConfiguration(jobConfig);
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(
+ new JobConfiguration().setExtract(extractConfig));
startJob(job, errorExtractor, client);
}
@@ -126,22 +143,17 @@ public class BigQueryServicesImpl implements BigQueryServices {
/**
* {@inheritDoc}
*
- * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
*
- * @throws IOException if it exceeds max RPC retries.
+ * @throws IOException if it exceeds max RPC .
*/
@Override
- public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun)
+ public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig)
throws IOException, InterruptedException {
- Job job = new Job();
- JobReference jobRef = new JobReference();
- jobRef.setProjectId(queryConfig.getDestinationTable().getProjectId());
- jobRef.setJobId(jobId);
- job.setJobReference(jobRef);
- JobConfiguration jobConfig = new JobConfiguration();
- jobConfig.setQuery(queryConfig);
- jobConfig.setDryRun(dryRun);
- job.setConfiguration(jobConfig);
+ Job job = new Job()
+ .setJobReference(jobRef)
+ .setConfiguration(
+ new JobConfiguration().setQuery(queryConfig));
startJob(job, errorExtractor, client);
}
@@ -160,8 +172,7 @@ public class BigQueryServicesImpl implements BigQueryServices {
ApiErrorExtractor errorExtractor,
Bigquery client,
Sleeper sleeper,
- BackOff backoff)
- throws InterruptedException, IOException {
+ BackOff backoff) throws IOException, InterruptedException {
JobReference jobRef = job.getJobReference();
Exception lastException = null;
do {
@@ -183,28 +194,27 @@ public class BigQueryServicesImpl implements BigQueryServices {
} while (nextBackOff(sleeper, backoff));
throw new IOException(
String.format(
- "Unable to insert job: %s, aborting after %d retries.",
+ "Unable to insert job: %s, aborting after %d .",
jobRef.getJobId(), MAX_RPC_ATTEMPTS),
lastException);
}
@Override
- public Job pollJob(String projectId, String jobId, int maxAttempts)
+ public Job pollJob(JobReference jobRef, int maxAttempts)
throws InterruptedException {
BackOff backoff = new AttemptBoundedExponentialBackOff(
maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS);
- return pollJob(projectId, jobId, Sleeper.DEFAULT, backoff);
+ return pollJob(jobRef, Sleeper.DEFAULT, backoff);
}
@VisibleForTesting
Job pollJob(
- String projectId,
- String jobId,
+ JobReference jobRef,
Sleeper sleeper,
BackOff backoff) throws InterruptedException {
do {
try {
- Job job = client.jobs().get(projectId, jobId).execute();
+ Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
JobStatus status = job.getStatus();
if (status != null && status.getState() != null && status.getState().equals("DONE")) {
return job;
@@ -215,21 +225,254 @@ public class BigQueryServicesImpl implements BigQueryServices {
LOG.warn("Ignore the error and retry polling job status.", e);
}
} while (nextBackOff(sleeper, backoff));
- LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId);
+ LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId());
return null;
}
+ @Override
+ public JobStatistics dryRunQuery(String projectId, String query)
+ throws InterruptedException, IOException {
+ Job job = new Job()
+ .setConfiguration(new JobConfiguration()
+ .setQuery(new JobConfigurationQuery()
+ .setQuery(query))
+ .setDryRun(true));
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ return executeWithRetries(
+ client.jobs().insert(projectId, job),
+ String.format(
+ "Unable to dry run query: %s, aborting after %d retries.",
+ query, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff).getStatistics();
+ }
+ }
+
+ @VisibleForTesting
+ static class DatasetServiceImpl implements DatasetService {
+ private final ApiErrorExtractor errorExtractor;
+ private final Bigquery client;
+
+ @VisibleForTesting
+ DatasetServiceImpl(Bigquery client) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = client;
+ }
+
+ private DatasetServiceImpl(BigQueryOptions bqOptions) {
+ this.errorExtractor = new ApiErrorExtractor();
+ this.client = Transport.newBigQueryClient(bqOptions).build();
+ }
+
/**
- * Identical to {@link BackOffUtils#next} but without checked IOException.
- * @throws InterruptedException
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
*/
- private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
- throws InterruptedException {
+ @Override
+ public Table getTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ return executeWithRetries(
+ client.tables().get(projectId, datasetId, tableId),
+ String.format(
+ "Unable to get table: %s, aborting after %d retries.",
+ tableId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void deleteTable(String projectId, String datasetId, String tableId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ executeWithRetries(
+ client.tables().delete(projectId, datasetId, tableId),
+ String.format(
+ "Unable to delete table: %s, aborting after %d retries.",
+ tableId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void createDataset(
+ String projectId, String datasetId, String location, String description)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff);
+ }
+
+ @VisibleForTesting
+ void createDataset(
+ String projectId,
+ String datasetId,
+ String location,
+ String description,
+ Sleeper sleeper,
+ BackOff backoff) throws IOException, InterruptedException {
+ DatasetReference datasetRef = new DatasetReference()
+ .setProjectId(projectId)
+ .setDatasetId(datasetId);
+
+ Dataset dataset = new Dataset()
+ .setDatasetReference(datasetRef)
+ .setLocation(location)
+ .setFriendlyName(location)
+ .setDescription(description);
+
+ Exception lastException;
+ do {
+ try {
+ client.datasets().insert(projectId, dataset).execute();
+ return; // SUCCEEDED
+ } catch (GoogleJsonResponseException e) {
+ if (errorExtractor.itemAlreadyExists(e)) {
+ return; // SUCCEEDED
+ }
+ // ignore and retry
+ LOG.warn("Ignore the error and retry creating the dataset.", e);
+ lastException = e;
+ } catch (IOException e) {
+ LOG.warn("Ignore the error and retry creating the dataset.", e);
+ lastException = e;
+ }
+ } while (nextBackOff(sleeper, backoff));
+ throw new IOException(
+ String.format(
+ "Unable to create dataset: %s, aborting after %d .",
+ datasetId, MAX_RPC_ATTEMPTS),
+ lastException);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p> the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds.
+ *
+ * @throws IOException if it exceeds max RPC .
+ */
+ @Override
+ public void deleteDataset(String projectId, String datasetId)
+ throws IOException, InterruptedException {
+ BackOff backoff =
+ new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS);
+ executeWithRetries(
+ client.datasets().delete(projectId, datasetId),
+ String.format(
+ "Unable to delete table: %s, aborting after %d retries.",
+ datasetId, MAX_RPC_ATTEMPTS),
+ Sleeper.DEFAULT,
+ backoff);
+ }
+ }
+
+ private static class BigQueryJsonReaderImpl implements BigQueryJsonReader {
+ BigQueryTableRowIterator iterator;
+
+ private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) {
+ this.iterator = iterator;
+ }
+
+ private static BigQueryJsonReader fromQuery(
+ BigQueryOptions bqOptions,
+ String query,
+ String projectId,
+ @Nullable Boolean flattenResults) {
+ return new BigQueryJsonReaderImpl(
+ BigQueryTableRowIterator.fromQuery(
+ query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults));
+ }
+
+ private static BigQueryJsonReader fromTable(
+ BigQueryOptions bqOptions,
+ TableReference tableRef) {
+ return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable(
+ tableRef, Transport.newBigQueryClient(bqOptions).build()));
+ }
+
+ @Override
+ public boolean start() throws IOException {
try {
- return BackOffUtils.next(sleeper, backoff);
+ iterator.open();
+ return iterator.advance();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during start() operation", e);
+ }
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ try {
+ return iterator.advance();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during advance() operation", e);
+ }
+ }
+
+ @Override
+ public TableRow getCurrent() throws NoSuchElementException {
+ return iterator.getCurrent();
+ }
+
+ @Override
+ public void close() throws IOException {
+ iterator.close();
+ }
+ }
+
+ @VisibleForTesting
+ static <T> T executeWithRetries(
+ AbstractGoogleClientRequest<T> request,
+ String errorMessage,
+ Sleeper sleeper,
+ BackOff backoff)
+ throws IOException, InterruptedException {
+ Exception lastException = null;
+ do {
+ try {
+ return request.execute();
} catch (IOException e) {
- throw new RuntimeException(e);
+ LOG.warn("Ignore the error and retry the request.", e);
+ lastException = e;
}
+ } while (nextBackOff(sleeper, backoff));
+ throw new IOException(
+ errorMessage,
+ lastException);
+ }
+
+ /**
+ * Identical to {@link BackOffUtils#next} but without checked IOException.
+ * @throws InterruptedException
+ */
+ private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
+ try {
+ return BackOffUtils.next(sleeper, backoff);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
}
}