You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/18 01:20:19 UTC

[1/2] beam git commit: This closes #2557

Repository: beam
Updated Branches:
  refs/heads/master a25c7d3cf -> 9b0cc9847


This closes #2557


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b0cc984
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b0cc984
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b0cc984

Branch: refs/heads/master
Commit: 9b0cc9847d8f0119b33961ec4768cf83dcd2b5fd
Parents: a25c7d3 b9e6577
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 17 18:20:07 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 17 18:20:07 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 59 +++++++-------------
 .../io/gcp/bigquery/BigQueryTableSource.java    | 30 +++++++++-
 2 files changed, 50 insertions(+), 39 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Set the Project of a Table Reference at Runtime

Posted by tg...@apache.org.
Set the Project of a Table Reference at Runtime

Instead of using the project at job submission time, use the project at
job execution time.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b9e65779
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b9e65779
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b9e65779

Branch: refs/heads/master
Commit: b9e657790c69ae4f9eead893655c595e34ded4da
Parents: a25c7d3
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 17 15:41:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 17 18:20:07 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 59 +++++++-------------
 .../io/gcp/bigquery/BigQueryTableSource.java    | 30 +++++++++-
 2 files changed, 50 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b9e65779/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f5f93b3..9753da5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -410,7 +410,7 @@ public class BigQueryIO {
         }
       }
 
-      ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
+      ValueProvider<TableReference> table = getTableProvider();
 
       checkState(
           table == null || getQuery() == null,
@@ -428,6 +428,12 @@ public class BigQueryIO {
             getUseLegacySql() == null,
             "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
                 + " preference, which only applies to queries");
+        if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
+          LOG.info(
+              "Project of {} not set. The value of {}.getProject() at execution time will be used.",
+              TableReference.class.getSimpleName(),
+              BigQueryOptions.class.getSimpleName());
+        }
       } else /* query != null */ {
         checkState(
             getFlattenResults() != null, "flattenResults should not be null if query is set");
@@ -495,10 +501,13 @@ public class BigQueryIO {
                 extractDestinationDir,
                 getBigQueryServices());
       } else {
-        ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
-        source = BigQueryTableSource.create(
-            jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(),
-            StaticValueProvider.of(executingProject));
+        source =
+            BigQueryTableSource.create(
+                jobIdToken,
+                getTableProvider(),
+                extractDestinationDir,
+                getBigQueryServices(),
+                StaticValueProvider.of(executingProject));
       }
       PassThroughThenCleanup.CleanupOperation cleanupOperation =
           new PassThroughThenCleanup.CleanupOperation() {
@@ -506,12 +515,12 @@ public class BigQueryIO {
             void cleanup(PipelineOptions options) throws Exception {
               BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
 
-              JobReference jobRef = new JobReference()
-                  .setProjectId(executingProject)
-                  .setJobId(getExtractJobId(jobIdToken));
+              JobReference jobRef =
+                  new JobReference()
+                      .setProjectId(executingProject)
+                      .setJobId(getExtractJobId(jobIdToken));
 
-              Job extractJob = getBigQueryServices().getJobService(bqOptions)
-                  .getJob(jobRef);
+              Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
 
               Collection<String> extractFiles = null;
               if (extractJob != null) {
@@ -526,7 +535,8 @@ public class BigQueryIO {
               if (extractFiles != null && !extractFiles.isEmpty()) {
                 new GcsUtilFactory().create(options).remove(extractFiles);
               }
-            }};
+            }
+          };
       return input.getPipeline()
           .apply(org.apache.beam.sdk.io.Read.from(source))
           .setCoder(getDefaultOutputCoder())
@@ -557,33 +567,6 @@ public class BigQueryIO {
 
     /**
      * Returns the table to read, or {@code null} if reading from a query instead.
-     *
-     * <p>If the table's project is not specified, use the executing project.
-     */
-    @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
-        BigQueryOptions bqOptions) {
-      ValueProvider<TableReference> table = getTableProvider();
-      if (table == null) {
-        return table;
-      }
-      if (!table.isAccessible()) {
-        LOG.info("Using a dynamic value for table input. This must contain a project"
-            + " in the table reference: {}", table);
-        return table;
-      }
-      if (Strings.isNullOrEmpty(table.get().getProjectId())) {
-        // If user does not specify a project we assume the table to be located in
-        // the default project.
-        TableReference tableRef = table.get();
-        tableRef.setProjectId(bqOptions.getProject());
-        return NestedValueProvider.of(StaticValueProvider.of(
-            BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
-      }
-      return table;
-    }
-
-    /**
-     * Returns the table to read, or {@code null} if reading from a query instead.
      */
     @Nullable
     public ValueProvider<TableReference> getTableProvider() {

http://git-wip-us.apache.org/repos/asf/beam/blob/b9e65779/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index cbd5781..22aba64 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
@@ -32,12 +33,15 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A {@link BigQuerySourceBase} for reading BigQuery tables.
  */
 @VisibleForTesting
 class BigQueryTableSource extends BigQuerySourceBase {
+  private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class);
 
   static BigQueryTableSource create(
       ValueProvider<String> jobIdToken,
@@ -66,7 +70,31 @@ class BigQueryTableSource extends BigQuerySourceBase {
   @Override
   protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
     checkState(jsonTable.isAccessible());
-    return BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
+    TableReference tableReference =
+        BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
+    return setDefaultProjectIfAbsent(bqOptions, tableReference);
+  }
+
+  /**
+   * Sets the {@link TableReference#projectId} of the provided table reference to the id of the
+   * default project if the table reference does not have a project ID specified.
+   */
+  private TableReference setDefaultProjectIfAbsent(
+      BigQueryOptions bqOptions, TableReference tableReference) {
+    if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+      checkState(
+          !Strings.isNullOrEmpty(bqOptions.getProject()),
+          "No project ID set in %s or %s, cannot construct a complete %s",
+          TableReference.class.getSimpleName(),
+          BigQueryOptions.class.getSimpleName(),
+          TableReference.class.getSimpleName());
+      LOG.info(
+          "Project ID not set in {}. Using default project from {}.",
+          TableReference.class.getSimpleName(),
+          BigQueryOptions.class.getSimpleName());
+      tableReference.setProjectId(bqOptions.getProject());
+    }
+    return tableReference;
   }
 
   @Override