You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/04/04 21:20:30 UTC
[beam] branch master updated: [BEAM-3774] Adds support for reading
from/writing to more BQ geographical locations (#5001)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 20405c3 [BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001)
20405c3 is described below
commit 20405c3eb5d5a58176ab93e62fa730f76758e208
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Apr 4 14:20:23 2018 -0700
[BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001)
* Adds support for reading from/writing to BigQuery datasets that are not in US or EU locations.
* Addressing reviewer comments.
---
.../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java | 21 ++++++++-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 28 ++++++++++-
.../sdk/io/gcp/bigquery/BigQueryQuerySource.java | 54 +++++++++++++++-------
.../beam/sdk/io/gcp/bigquery/BigQueryServices.java | 2 +-
.../sdk/io/gcp/bigquery/BigQueryServicesImpl.java | 10 ++--
.../sdk/io/gcp/bigquery/BigQuerySourceBase.java | 17 ++++---
.../beam/sdk/io/gcp/bigquery/WriteTables.java | 11 +++--
.../sdk/io/gcp/bigquery/BigQueryIOReadTest.java | 6 ++-
.../beam/sdk/io/gcp/bigquery/FakeJobService.java | 2 +-
9 files changed, 115 insertions(+), 36 deletions(-)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 96a0622..29b405b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import static com.google.common.base.Preconditions.checkState;
+import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
@@ -203,12 +204,28 @@ public class BigQueryHelpers {
} else {
throw new RuntimeException(
String.format(
- UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)),
- e);
+ UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)), e);
}
}
}
+ static String getDatasetLocation(
+ DatasetService datasetService, String projectId, String datasetId) {
+ Dataset dataset;
+ try {
+ dataset = datasetService.getDataset(projectId, datasetId);
+ } catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new RuntimeException(
+ String.format(
+ "unable to obtain dataset for dataset %s in project %s", datasetId, projectId),
+ e);
+ }
+ return dataset.getLocation();
+ }
+
static void verifyTablePresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getTable(table);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 88de9b4..fab238c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -170,6 +170,13 @@ import org.slf4j.LoggerFactory;
* .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
* }</pre>
*
+ * <p>Users can optionally specify a query priority using {@link TypedRead#withQueryPriority(
+ * TypedRead.QueryPriority)} and a geographic location where the query will be executed using {@link
+ * TypedRead#withQueryLocation(String)}. Query location must be specified for jobs that are not
+ * executed in US or EU. See <a
+ * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:
+ * query</a>.
+ *
* <h3>Writing</h3>
*
* <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes a
@@ -549,6 +556,7 @@ public class BigQueryIO {
abstract Builder<T> setWithTemplateCompatibility(Boolean useTemplateCompatibility);
abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
abstract Builder<T> setQueryPriority(QueryPriority priority);
+ abstract Builder<T> setQueryLocation(String location);
abstract TypedRead<T> build();
abstract Builder<T> setParseFn(
@@ -570,6 +578,8 @@ public class BigQueryIO {
@Nullable abstract QueryPriority getQueryPriority();
+ @Nullable abstract String getQueryLocation();
+
@Nullable abstract Coder<T> getCoder();
/**
@@ -632,7 +642,8 @@ public class BigQueryIO {
getBigQueryServices(),
coder,
getParseFn(),
- MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH));
+ MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
+ getQueryLocation());
}
return source;
}
@@ -687,7 +698,8 @@ public class BigQueryIO {
new JobConfigurationQuery()
.setQuery(getQuery().get())
.setFlattenResults(getFlattenResults())
- .setUseLegacySql(getUseLegacySql()));
+ .setUseLegacySql(getUseLegacySql()),
+ getQueryLocation());
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
@@ -939,6 +951,18 @@ public class BigQueryIO {
return toBuilder().setQueryPriority(priority).build();
}
+ /**
+ * BigQuery geographic location where the query <a
+ * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs">job</a> will be
+ * executed. If not specified, Beam tries to determine the location by examining the tables
+ * referenced by the query. Location must be specified for queries not executed in US or EU. See
+ * <a href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:
+ * query</a>.
+ */
+ public TypedRead<T> withQueryLocation(String location) {
+ return toBuilder().setQueryLocation(location).build();
+ }
+
@Experimental(Experimental.Kind.SOURCE_SINK)
public TypedRead<T> withTemplateCompatibility() {
return toBuilder().setWithTemplateCompatibility(true).build();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 34d7c68..f380b7d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -59,9 +59,18 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
BigQueryServices bqServices,
Coder<T> coder,
SerializableFunction<SchemaAndRecord, T> parseFn,
- QueryPriority priority) {
+ QueryPriority priority,
+ String location) {
return new BigQueryQuerySource<>(
- stepUuid, query, flattenResults, useLegacySql, bqServices, coder, parseFn, priority);
+ stepUuid,
+ query,
+ flattenResults,
+ useLegacySql,
+ bqServices,
+ coder,
+ parseFn,
+ priority,
+ location);
}
private final ValueProvider<String> query;
@@ -69,6 +78,7 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
private final Boolean useLegacySql;
private transient AtomicReference<JobStatistics> dryRunJobStats;
private final QueryPriority priority;
+ private final String location;
private BigQueryQuerySource(
String stepUuid,
@@ -78,13 +88,15 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
BigQueryServices bqServices,
Coder<T> coder,
SerializableFunction<SchemaAndRecord, T> parseFn,
- QueryPriority priority) {
+ QueryPriority priority,
+ String location) {
super(stepUuid, bqServices, coder, parseFn);
this.query = checkNotNull(query, "query");
this.flattenResults = checkNotNull(flattenResults, "flattenResults");
this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
this.dryRunJobStats = new AtomicReference<>();
this.priority = priority;
+ this.location = location;
}
@Override
@@ -97,13 +109,17 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
protected TableReference getTableToExtract(BigQueryOptions bqOptions)
throws IOException, InterruptedException {
// 1. Find the location of the query.
- String location = null;
- List<TableReference> referencedTables =
- dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+ String location = this.location;
DatasetService tableService = bqServices.getDatasetService(bqOptions);
- if (referencedTables != null && !referencedTables.isEmpty()) {
- TableReference queryTable = referencedTables.get(0);
- location = tableService.getTable(queryTable).getLocation();
+ if (location == null) {
+ // If location was not provided we try to determine it from the tables referenced by the
+ // Query. This will only work for BQ locations US and EU.
+ List<TableReference> referencedTables =
+ dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+ if (referencedTables != null && !referencedTables.isEmpty()) {
+ TableReference queryTable = referencedTables.get(0);
+ location = tableService.getTable(queryTable).getLocation();
+ }
}
String jobIdToken = createJobIdToken(bqOptions.getJobName(), stepUuid);
@@ -125,7 +141,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
// 3. Execute the query.
executeQuery(
- jobIdToken, bqOptions.getProject(), tableToExtract, bqServices.getJobService(bqOptions));
+ jobIdToken, bqOptions.getProject(), tableToExtract, bqServices.getJobService(bqOptions),
+ location);
return tableToExtract;
}
@@ -151,8 +168,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
throws InterruptedException, IOException {
if (dryRunJobStats.get() == null) {
- JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
- bqOptions.getProject(), createBasicQueryConfig());
+ JobStatistics jobStats =
+ bqServices
+ .getJobService(bqOptions)
+ .dryRunQuery(bqOptions.getProject(), createBasicQueryConfig(), this.location);
dryRunJobStats.compareAndSet(null, jobStats);
}
return dryRunJobStats.get();
@@ -162,7 +181,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
String jobIdToken,
String executingProject,
TableReference destinationTable,
- JobService jobService) throws IOException, InterruptedException {
+ JobService jobService,
+ String bqLocation) throws IOException, InterruptedException {
// Generate a transient (random) query job ID, because this code may be retried after the
// temporary dataset and table have already been deleted by a previous attempt -
// in that case we want to re-generate the temporary dataset and table, and we'll need
@@ -174,9 +194,11 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
destinationTable,
queryJobId);
- JobReference jobRef = new JobReference()
- .setProjectId(executingProject)
- .setJobId(queryJobId);
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(executingProject)
+ .setLocation(bqLocation)
+ .setJobId(queryJobId);
JobConfigurationQuery queryConfig = createBasicQueryConfig()
.setAllowLargeResults(true)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dde005d..1295cc0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -85,7 +85,7 @@ interface BigQueryServices extends Serializable {
/**
* Dry runs the query in the given project.
*/
- JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
+ JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, String location)
throws InterruptedException, IOException;
/**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 6c76688..9771733 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -254,7 +254,9 @@ class BigQueryServicesImpl implements BigQueryServices {
BackOff backoff) throws InterruptedException {
do {
try {
- Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
+ Job job = client.jobs().get(
+ jobRef.getProjectId(), jobRef.getJobId()).setLocation(
+ jobRef.getLocation()).execute();
JobStatus status = job.getStatus();
if (status != null && status.getState() != null && status.getState().equals("DONE")) {
LOG.info("BigQuery job {} completed in state DONE", jobRef);
@@ -281,9 +283,11 @@ class BigQueryServicesImpl implements BigQueryServices {
}
@Override
- public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
+ public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig,
+ String location)
throws InterruptedException, IOException {
- Job job = new Job()
+ JobReference jobRef = new JobReference().setLocation(location).setProjectId(projectId);
+ Job job = new Job().setJobReference(jobRef)
.setConfiguration(new JobConfiguration()
.setQuery(queryConfig)
.setDryRun(true));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a15afed..6d82c56 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -101,7 +101,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
- Table table = bqServices.getDatasetService(bqOptions).getTable(tableToExtract);
+ BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions);
+ Table table = datasetService.getTable(tableToExtract);
if (table == null) {
throw new IOException(String.format(
"Cannot start an export job since table %s does not exist",
@@ -113,13 +114,17 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(
+ datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
List<ResourceId> tempFiles =
executeExtract(
extractJobId,
tableToExtract,
jobService,
bqOptions.getProject(),
- extractDestinationDir);
+ extractDestinationDir,
+ bqLocation);
return new ExtractResult(schema, tempFiles);
}
@@ -160,11 +165,11 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
private List<ResourceId> executeExtract(
String jobId, TableReference table, JobService jobService, String executingProject,
- String extractDestinationDir)
+ String extractDestinationDir, String bqLocation)
throws InterruptedException, IOException {
- JobReference jobRef = new JobReference()
- .setProjectId(executingProject)
- .setJobId(jobId);
+
+ JobReference jobRef =
+ new JobReference().setProjectId(executingProject).setLocation(bqLocation).setJobId(jobId);
String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);
JobConfigurationExtract extract = new JobConfigurationExtract()
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 339003d..cd128a1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -228,8 +228,6 @@ class WriteTables<DestinationT>
return writeTablesOutputs.get(mainOutputTag);
}
-
-
private void load(
JobService jobService,
DatasetService datasetService,
@@ -255,13 +253,20 @@ class WriteTables<DestinationT>
}
String projectId = ref.getProjectId();
Job lastFailedLoadJob = null;
+ String bqLocation =
+ BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId());
for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
String jobId = jobIdPrefix + "-" + i;
- JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
+
+ JobReference jobRef =
+ new JobReference().setProjectId(projectId).setJobId(jobId).setLocation(bqLocation);
+
LOG.info("Loading {} files into {} using job {}, attempt {}", gcsUris.size(), ref, jobRef, i);
jobService.startLoadJob(jobRef, loadConfig);
LOG.info("Load job {} started", jobRef);
+
Job loadJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+
Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
switch (jobStatus) {
case SUCCEEDED:
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 6b4cf53..b6fbe49 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -614,7 +614,8 @@ public class BigQueryIOReadTest implements Serializable {
fakeBqServices,
TableRowJsonCoder.of(),
BigQueryIO.TableRowParser.INSTANCE,
- QueryPriority.BATCH);
+ QueryPriority.BATCH,
+ null);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
TableReference queryTable = new TableReference()
@@ -693,7 +694,8 @@ public class BigQueryIOReadTest implements Serializable {
fakeBqServices,
TableRowJsonCoder.of(),
BigQueryIO.TableRowParser.INSTANCE,
- QueryPriority.BATCH);
+ QueryPriority.BATCH,
+ null);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index e1edd83..ac715a3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -228,7 +228,7 @@ class FakeJobService implements JobService, Serializable {
}
@Override
- public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query)
+ public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query, String location)
throws InterruptedException, IOException {
synchronized (dryRunQueryResults) {
JobStatistics result = dryRunQueryResults.get(projectId, query.getQuery());
--
To stop receiving notification emails like this one, please contact
chamikara@apache.org.