You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/18 01:20:19 UTC
[1/2] beam git commit: This closes #2557
Repository: beam
Updated Branches:
refs/heads/master a25c7d3cf -> 9b0cc9847
This closes #2557
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b0cc984
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b0cc984
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b0cc984
Branch: refs/heads/master
Commit: 9b0cc9847d8f0119b33961ec4768cf83dcd2b5fd
Parents: a25c7d3 b9e6577
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 17 18:20:07 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 17 18:20:07 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 59 +++++++-------------
.../io/gcp/bigquery/BigQueryTableSource.java | 30 +++++++++-
2 files changed, 50 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Set the Project of a Table Reference at Runtime
Posted by tg...@apache.org.
Set the Project of a Table Reference at Runtime
Instead of using the project at job submission time, use the project at
job execution time.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b9e65779
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b9e65779
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b9e65779
Branch: refs/heads/master
Commit: b9e657790c69ae4f9eead893655c595e34ded4da
Parents: a25c7d3
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 17 15:41:57 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 17 18:20:07 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 59 +++++++-------------
.../io/gcp/bigquery/BigQueryTableSource.java | 30 +++++++++-
2 files changed, 50 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b9e65779/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index f5f93b3..9753da5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -410,7 +410,7 @@ public class BigQueryIO {
}
}
- ValueProvider<TableReference> table = getTableWithDefaultProject(bqOptions);
+ ValueProvider<TableReference> table = getTableProvider();
checkState(
table == null || getQuery() == null,
@@ -428,6 +428,12 @@ public class BigQueryIO {
getUseLegacySql() == null,
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+ " preference, which only applies to queries");
+ if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
+ LOG.info(
+ "Project of {} not set. The value of {}.getProject() at execution time will be used.",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName());
+ }
} else /* query != null */ {
checkState(
getFlattenResults() != null, "flattenResults should not be null if query is set");
@@ -495,10 +501,13 @@ public class BigQueryIO {
extractDestinationDir,
getBigQueryServices());
} else {
- ValueProvider<TableReference> inputTable = getTableWithDefaultProject(bqOptions);
- source = BigQueryTableSource.create(
- jobIdToken, inputTable, extractDestinationDir, getBigQueryServices(),
- StaticValueProvider.of(executingProject));
+ source =
+ BigQueryTableSource.create(
+ jobIdToken,
+ getTableProvider(),
+ extractDestinationDir,
+ getBigQueryServices(),
+ StaticValueProvider.of(executingProject));
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@@ -506,12 +515,12 @@ public class BigQueryIO {
void cleanup(PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
- JobReference jobRef = new JobReference()
- .setProjectId(executingProject)
- .setJobId(getExtractJobId(jobIdToken));
+ JobReference jobRef =
+ new JobReference()
+ .setProjectId(executingProject)
+ .setJobId(getExtractJobId(jobIdToken));
- Job extractJob = getBigQueryServices().getJobService(bqOptions)
- .getJob(jobRef);
+ Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
Collection<String> extractFiles = null;
if (extractJob != null) {
@@ -526,7 +535,8 @@ public class BigQueryIO {
if (extractFiles != null && !extractFiles.isEmpty()) {
new GcsUtilFactory().create(options).remove(extractFiles);
}
- }};
+ }
+ };
return input.getPipeline()
.apply(org.apache.beam.sdk.io.Read.from(source))
.setCoder(getDefaultOutputCoder())
@@ -557,33 +567,6 @@ public class BigQueryIO {
/**
* Returns the table to read, or {@code null} if reading from a query instead.
- *
- * <p>If the table's project is not specified, use the executing project.
- */
- @Nullable ValueProvider<TableReference> getTableWithDefaultProject(
- BigQueryOptions bqOptions) {
- ValueProvider<TableReference> table = getTableProvider();
- if (table == null) {
- return table;
- }
- if (!table.isAccessible()) {
- LOG.info("Using a dynamic value for table input. This must contain a project"
- + " in the table reference: {}", table);
- return table;
- }
- if (Strings.isNullOrEmpty(table.get().getProjectId())) {
- // If user does not specify a project we assume the table to be located in
- // the default project.
- TableReference tableRef = table.get();
- tableRef.setProjectId(bqOptions.getProject());
- return NestedValueProvider.of(StaticValueProvider.of(
- BigQueryHelpers.toJsonString(tableRef)), new JsonTableRefToTableRef());
- }
- return table;
- }
-
- /**
- * Returns the table to read, or {@code null} if reading from a query instead.
*/
@Nullable
public ValueProvider<TableReference> getTableProvider() {
http://git-wip-us.apache.org/repos/asf/beam/blob/b9e65779/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
index cbd5781..22aba64 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryTableSource.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
@@ -32,12 +33,15 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link BigQuerySourceBase} for reading BigQuery tables.
*/
@VisibleForTesting
class BigQueryTableSource extends BigQuerySourceBase {
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableSource.class);
static BigQueryTableSource create(
ValueProvider<String> jobIdToken,
@@ -66,7 +70,31 @@ class BigQueryTableSource extends BigQuerySourceBase {
@Override
protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
checkState(jsonTable.isAccessible());
- return BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
+ TableReference tableReference =
+ BigQueryIO.JSON_FACTORY.fromString(jsonTable.get(), TableReference.class);
+ return setDefaultProjectIfAbsent(bqOptions, tableReference);
+ }
+
+ /**
+ * Sets the {@link TableReference#projectId} of the provided table reference to the id of the
+ * default project if the table reference does not have a project ID specified.
+ */
+ private TableReference setDefaultProjectIfAbsent(
+ BigQueryOptions bqOptions, TableReference tableReference) {
+ if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+ checkState(
+ !Strings.isNullOrEmpty(bqOptions.getProject()),
+ "No project ID set in %s or %s, cannot construct a complete %s",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName(),
+ TableReference.class.getSimpleName());
+ LOG.info(
+ "Project ID not set in {}. Using default project from {}.",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName());
+ tableReference.setProjectId(bqOptions.getProject());
+ }
+ return tableReference;
}
@Override