You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/04/04 21:20:30 UTC

[beam] branch master updated: [BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 20405c3  [BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001)
20405c3 is described below

commit 20405c3eb5d5a58176ab93e62fa730f76758e208
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Apr 4 14:20:23 2018 -0700

    [BEAM-3774] Adds support for reading from/writing to more BQ geographical locations (#5001)
    
    * Adds support for reading from/writing to BigQuery datasets that are not in US or EU locations.
    
    * Addressing reviewer comments.
---
 .../beam/sdk/io/gcp/bigquery/BigQueryHelpers.java  | 21 ++++++++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java       | 28 ++++++++++-
 .../sdk/io/gcp/bigquery/BigQueryQuerySource.java   | 54 +++++++++++++++-------
 .../beam/sdk/io/gcp/bigquery/BigQueryServices.java |  2 +-
 .../sdk/io/gcp/bigquery/BigQueryServicesImpl.java  | 10 ++--
 .../sdk/io/gcp/bigquery/BigQuerySourceBase.java    | 17 ++++---
 .../beam/sdk/io/gcp/bigquery/WriteTables.java      | 11 +++--
 .../sdk/io/gcp/bigquery/BigQueryIOReadTest.java    |  6 ++-
 .../beam/sdk/io/gcp/bigquery/FakeJobService.java   |  2 +-
 9 files changed, 115 insertions(+), 36 deletions(-)

diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
index 96a0622..29b405b 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryHelpers.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.api.services.bigquery.model.Dataset;
 import com.google.api.services.bigquery.model.Job;
 import com.google.api.services.bigquery.model.JobStatus;
 import com.google.api.services.bigquery.model.TableReference;
@@ -203,12 +204,28 @@ public class BigQueryHelpers {
       } else {
         throw new RuntimeException(
             String.format(
-                UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)),
-            e);
+                UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)), e);
       }
     }
   }
 
+  static String getDatasetLocation(
+      DatasetService datasetService, String projectId, String datasetId) {
+    Dataset dataset;
+    try {
+      dataset = datasetService.getDataset(projectId, datasetId);
+    } catch (Exception e) {
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
+      throw new RuntimeException(
+          String.format(
+              "unable to obtain dataset for dataset %s in project %s", datasetId, projectId),
+          e);
+    }
+    return dataset.getLocation();
+  }
+
   static void verifyTablePresence(DatasetService datasetService, TableReference table) {
     try {
       datasetService.getTable(table);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 88de9b4..fab238c 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -170,6 +170,13 @@ import org.slf4j.LoggerFactory;
  *     .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
  * }</pre>
  *
+ * <p>Users can optionally specify a query priority using {@link TypedRead#withQueryPriority(
+ * TypedRead.QueryPriority)} and a geographic location where the query will be executed using {@link
+ * TypedRead#withQueryLocation(String)}. Query location must be specified for jobs that are not
+ * executed in US or EU. See <a
+ * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:
+ * query</a>.
+ *
  * <h3>Writing</h3>
  *
  * <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes a
@@ -549,6 +556,7 @@ public class BigQueryIO {
       abstract Builder<T> setWithTemplateCompatibility(Boolean useTemplateCompatibility);
       abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
       abstract Builder<T> setQueryPriority(QueryPriority priority);
+      abstract Builder<T> setQueryLocation(String location);
       abstract TypedRead<T> build();
 
       abstract Builder<T> setParseFn(
@@ -570,6 +578,8 @@ public class BigQueryIO {
 
     @Nullable abstract QueryPriority getQueryPriority();
 
+    @Nullable abstract String getQueryLocation();
+
     @Nullable abstract Coder<T> getCoder();
 
     /**
@@ -632,7 +642,8 @@ public class BigQueryIO {
                 getBigQueryServices(),
                 coder,
                 getParseFn(),
-                MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH));
+                MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
+                getQueryLocation());
       }
       return source;
     }
@@ -687,7 +698,8 @@ public class BigQueryIO {
                 new JobConfigurationQuery()
                     .setQuery(getQuery().get())
                     .setFlattenResults(getFlattenResults())
-                    .setUseLegacySql(getUseLegacySql()));
+                    .setUseLegacySql(getUseLegacySql()),
+                getQueryLocation());
           } catch (Exception e) {
             throw new IllegalArgumentException(
                 String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
@@ -939,6 +951,18 @@ public class BigQueryIO {
       return toBuilder().setQueryPriority(priority).build();
     }
 
+    /**
+     * BigQuery geographic location where the query <a
+     * href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs">job</a> will be
+     * executed. If not specified, Beam tries to determine the location by examining the tables
+     * referenced by the query. Location must be specified for queries not executed in US or EU. See
+     * <a href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:
+     * query</a>.
+     */
+    public TypedRead<T> withQueryLocation(String location) {
+      return toBuilder().setQueryLocation(location).build();
+    }
+
     @Experimental(Experimental.Kind.SOURCE_SINK)
     public TypedRead<T> withTemplateCompatibility() {
       return toBuilder().setWithTemplateCompatibility(true).build();
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
index 34d7c68..f380b7d 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQuerySource.java
@@ -59,9 +59,18 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       BigQueryServices bqServices,
       Coder<T> coder,
       SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority) {
+      QueryPriority priority,
+      String location) {
     return new BigQueryQuerySource<>(
-        stepUuid, query, flattenResults, useLegacySql, bqServices, coder, parseFn, priority);
+        stepUuid,
+        query,
+        flattenResults,
+        useLegacySql,
+        bqServices,
+        coder,
+        parseFn,
+        priority,
+        location);
   }
 
   private final ValueProvider<String> query;
@@ -69,6 +78,7 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
   private final Boolean useLegacySql;
   private transient AtomicReference<JobStatistics> dryRunJobStats;
   private final QueryPriority priority;
+  private final String location;
 
   private BigQueryQuerySource(
       String stepUuid,
@@ -78,13 +88,15 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       BigQueryServices bqServices,
       Coder<T> coder,
       SerializableFunction<SchemaAndRecord, T> parseFn,
-      QueryPriority priority) {
+      QueryPriority priority,
+      String location) {
     super(stepUuid, bqServices, coder, parseFn);
     this.query = checkNotNull(query, "query");
     this.flattenResults = checkNotNull(flattenResults, "flattenResults");
     this.useLegacySql = checkNotNull(useLegacySql, "useLegacySql");
     this.dryRunJobStats = new AtomicReference<>();
     this.priority = priority;
+    this.location = location;
   }
 
   @Override
@@ -97,13 +109,17 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
   protected TableReference getTableToExtract(BigQueryOptions bqOptions)
       throws IOException, InterruptedException {
     // 1. Find the location of the query.
-    String location = null;
-    List<TableReference> referencedTables =
-        dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+    String location = this.location;
     DatasetService tableService = bqServices.getDatasetService(bqOptions);
-    if (referencedTables != null && !referencedTables.isEmpty()) {
-      TableReference queryTable = referencedTables.get(0);
-      location = tableService.getTable(queryTable).getLocation();
+    if (location == null) {
+      // If location was not provided we try to determine it from the tables referenced by the
+      // Query. This will only work for BQ locations US and EU.
+      List<TableReference> referencedTables =
+          dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables();
+      if (referencedTables != null && !referencedTables.isEmpty()) {
+        TableReference queryTable = referencedTables.get(0);
+        location = tableService.getTable(queryTable).getLocation();
+      }
     }
 
     String jobIdToken = createJobIdToken(bqOptions.getJobName(), stepUuid);
@@ -125,7 +141,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
 
     // 3. Execute the query.
     executeQuery(
-        jobIdToken, bqOptions.getProject(), tableToExtract, bqServices.getJobService(bqOptions));
+        jobIdToken, bqOptions.getProject(), tableToExtract, bqServices.getJobService(bqOptions),
+        location);
 
     return tableToExtract;
   }
@@ -151,8 +168,10 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
   private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions)
       throws InterruptedException, IOException {
     if (dryRunJobStats.get() == null) {
-      JobStatistics jobStats = bqServices.getJobService(bqOptions).dryRunQuery(
-          bqOptions.getProject(), createBasicQueryConfig());
+      JobStatistics jobStats =
+          bqServices
+              .getJobService(bqOptions)
+              .dryRunQuery(bqOptions.getProject(), createBasicQueryConfig(), this.location);
       dryRunJobStats.compareAndSet(null, jobStats);
     }
     return dryRunJobStats.get();
@@ -162,7 +181,8 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
       String jobIdToken,
       String executingProject,
       TableReference destinationTable,
-      JobService jobService) throws IOException, InterruptedException {
+      JobService jobService,
+      String bqLocation) throws IOException, InterruptedException {
     // Generate a transient (random) query job ID, because this code may be retried after the
     // temporary dataset and table have already been deleted by a previous attempt -
     // in that case we want to re-generate the temporary dataset and table, and we'll need
@@ -174,9 +194,11 @@ class BigQueryQuerySource<T> extends BigQuerySourceBase<T> {
         destinationTable,
         queryJobId);
 
-    JobReference jobRef = new JobReference()
-        .setProjectId(executingProject)
-        .setJobId(queryJobId);
+    JobReference jobRef =
+        new JobReference()
+            .setProjectId(executingProject)
+            .setLocation(bqLocation)
+            .setJobId(queryJobId);
 
     JobConfigurationQuery queryConfig = createBasicQueryConfig()
         .setAllowLargeResults(true)
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index dde005d..1295cc0 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -85,7 +85,7 @@ interface BigQueryServices extends Serializable {
     /**
      * Dry runs the query in the given project.
      */
-    JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
+    JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig, String location)
         throws InterruptedException, IOException;
 
     /**
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index 6c76688..9771733 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -254,7 +254,9 @@ class BigQueryServicesImpl implements BigQueryServices {
         BackOff backoff) throws InterruptedException {
       do {
         try {
-          Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute();
+          Job job = client.jobs().get(
+                  jobRef.getProjectId(), jobRef.getJobId()).setLocation(
+                          jobRef.getLocation()).execute();
           JobStatus status = job.getStatus();
           if (status != null && status.getState() != null && status.getState().equals("DONE")) {
             LOG.info("BigQuery job {} completed in state DONE", jobRef);
@@ -281,9 +283,11 @@ class BigQueryServicesImpl implements BigQueryServices {
     }
 
     @Override
-    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig)
+    public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery queryConfig,
+                                     String location)
         throws InterruptedException, IOException {
-      Job job = new Job()
+      JobReference jobRef = new JobReference().setLocation(location).setProjectId(projectId);
+      Job job = new Job().setJobReference(jobRef)
           .setConfiguration(new JobConfiguration()
               .setQuery(queryConfig)
               .setDryRun(true));
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
index a15afed..6d82c56 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java
@@ -101,7 +101,8 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
   protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     TableReference tableToExtract = getTableToExtract(bqOptions);
-    Table table = bqServices.getDatasetService(bqOptions).getTable(tableToExtract);
+    BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions);
+    Table table = datasetService.getTable(tableToExtract);
     if (table == null) {
       throw new IOException(String.format(
               "Cannot start an export job since table %s does not exist",
@@ -113,13 +114,17 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
     String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
     final String extractDestinationDir =
         resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(
+            datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
     List<ResourceId> tempFiles =
         executeExtract(
             extractJobId,
             tableToExtract,
             jobService,
             bqOptions.getProject(),
-            extractDestinationDir);
+            extractDestinationDir,
+            bqLocation);
     return new ExtractResult(schema, tempFiles);
   }
 
@@ -160,11 +165,11 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
 
   private List<ResourceId> executeExtract(
       String jobId, TableReference table, JobService jobService, String executingProject,
-      String extractDestinationDir)
+      String extractDestinationDir, String bqLocation)
           throws InterruptedException, IOException {
-    JobReference jobRef = new JobReference()
-        .setProjectId(executingProject)
-        .setJobId(jobId);
+
+    JobReference jobRef =
+        new JobReference().setProjectId(executingProject).setLocation(bqLocation).setJobId(jobId);
 
     String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);
     JobConfigurationExtract extract = new JobConfigurationExtract()
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 339003d..cd128a1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -228,8 +228,6 @@ class WriteTables<DestinationT>
     return writeTablesOutputs.get(mainOutputTag);
   }
 
-
-
   private void load(
       JobService jobService,
       DatasetService datasetService,
@@ -255,13 +253,20 @@ class WriteTables<DestinationT>
     }
     String projectId = ref.getProjectId();
     Job lastFailedLoadJob = null;
+    String bqLocation =
+        BigQueryHelpers.getDatasetLocation(datasetService, ref.getProjectId(), ref.getDatasetId());
     for (int i = 0; i < BatchLoads.MAX_RETRY_JOBS; ++i) {
       String jobId = jobIdPrefix + "-" + i;
-      JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
+
+      JobReference jobRef =
+          new JobReference().setProjectId(projectId).setJobId(jobId).setLocation(bqLocation);
+
       LOG.info("Loading {} files into {} using job {}, attempt {}", gcsUris.size(), ref, jobRef, i);
       jobService.startLoadJob(jobRef, loadConfig);
       LOG.info("Load job {} started", jobRef);
+
       Job loadJob = jobService.pollJob(jobRef, BatchLoads.LOAD_JOB_POLL_MAX_RETRIES);
+
       Status jobStatus = BigQueryHelpers.parseStatus(loadJob);
       switch (jobStatus) {
         case SUCCEEDED:
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
index 6b4cf53..b6fbe49 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOReadTest.java
@@ -614,7 +614,8 @@ public class BigQueryIOReadTest implements Serializable {
         fakeBqServices,
         TableRowJsonCoder.of(),
         BigQueryIO.TableRowParser.INSTANCE,
-        QueryPriority.BATCH);
+        QueryPriority.BATCH,
+        null);
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
     TableReference queryTable = new TableReference()
@@ -693,7 +694,8 @@ public class BigQueryIOReadTest implements Serializable {
         fakeBqServices,
         TableRowJsonCoder.of(),
         BigQueryIO.TableRowParser.INSTANCE,
-        QueryPriority.BATCH);
+        QueryPriority.BATCH,
+        null);
 
     options.setTempLocation(testFolder.getRoot().getAbsolutePath());
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index e1edd83..ac715a3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -228,7 +228,7 @@ class FakeJobService implements JobService, Serializable {
   }
 
   @Override
-  public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query)
+  public JobStatistics dryRunQuery(String projectId, JobConfigurationQuery query, String location)
       throws InterruptedException, IOException {
     synchronized (dryRunQueryResults) {
       JobStatistics result = dryRunQueryResults.get(projectId, query.getQuery());

-- 
To stop receiving notification emails like this one, please contact
chamikara@apache.org.